parent
0cf5632f60
commit
97316efeb8
@ -1,44 +1,43 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
__author__ = 'bobby'
|
|
||||||
|
from datetime import datetime
|
||||||
from datetime import datetime
|
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
|
||||||
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
|
analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer
|
||||||
analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer
|
|
||||||
|
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer
|
||||||
from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer
|
|
||||||
|
from elasticsearch_dsl.connections import connections
|
||||||
from elasticsearch_dsl.connections import connections
|
|
||||||
|
connections.create_connection(hosts=["localhost"])
|
||||||
connections.create_connection(hosts=["localhost"])
|
|
||||||
|
|
||||||
|
class CustomAnalyzer(_CustomAnalyzer):
|
||||||
class CustomAnalyzer(_CustomAnalyzer):
|
def get_analysis_definition(self):
|
||||||
def get_analysis_definition(self):
|
return {}
|
||||||
return {}
|
|
||||||
|
|
||||||
|
ik_analyzer = CustomAnalyzer("ik_max_word", filter=["lowercase"])
|
||||||
ik_analyzer = CustomAnalyzer("ik_max_word", filter=["lowercase"])
|
|
||||||
|
|
||||||
|
class ArticleType(DocType):
|
||||||
class ArticleType(DocType):
|
# 伯乐在线文章类型
|
||||||
# 伯乐在线文章类型
|
suggest = Completion(analyzer=ik_analyzer)
|
||||||
suggest = Completion(analyzer=ik_analyzer)
|
title = Text(analyzer="ik_max_word")
|
||||||
title = Text(analyzer="ik_max_word")
|
create_date = Date()
|
||||||
create_date = Date()
|
url = Keyword()
|
||||||
url = Keyword()
|
url_object_id = Keyword()
|
||||||
url_object_id = Keyword()
|
front_image_url = Keyword()
|
||||||
front_image_url = Keyword()
|
front_image_path = Keyword()
|
||||||
front_image_path = Keyword()
|
praise_nums = Integer()
|
||||||
praise_nums = Integer()
|
comment_nums = Integer()
|
||||||
comment_nums = Integer()
|
fav_nums = Integer()
|
||||||
fav_nums = Integer()
|
tags = Text(analyzer="ik_max_word")
|
||||||
tags = Text(analyzer="ik_max_word")
|
content = Text(analyzer="ik_max_word")
|
||||||
content = Text(analyzer="ik_max_word")
|
|
||||||
|
class Meta:
|
||||||
class Meta:
|
index = "jobbole"
|
||||||
index = "jobbole"
|
doc_type = "article"
|
||||||
doc_type = "article"
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
if __name__ == "__main__":
|
ArticleType.init()
|
||||||
ArticleType.init()
|
|
||||||
|
@ -1,169 +1,138 @@
|
|||||||
# Define your item pipelines here
|
# Define your item pipelines here
|
||||||
#
|
#
|
||||||
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
|
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
|
||||||
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
||||||
|
|
||||||
|
|
||||||
# useful for handling different item types with a single interface
|
# useful for handling different item types with a single interface
|
||||||
from itemadapter import ItemAdapter
|
from itemadapter import ItemAdapter
|
||||||
from scrapy.pipelines.images import ImagesPipeline
|
from scrapy.pipelines.images import ImagesPipeline
|
||||||
from scrapy.http.request import Request
|
from scrapy.http.request import Request
|
||||||
from ArticleSpider.models.es_types import ArticleType
|
from ArticleSpider.models.es_types import ArticleType
|
||||||
import codecs
|
import codecs
|
||||||
import json
|
import json
|
||||||
from w3lib.html import remove_tags
|
from w3lib.html import remove_tags
|
||||||
from scrapy.exporters import JsonItemExporter
|
from scrapy.exporters import JsonItemExporter
|
||||||
import MySQLdb
|
import MySQLdb
|
||||||
from twisted.enterprise import adbapi
|
from twisted.enterprise import adbapi
|
||||||
from MySQLdb.cursors import DictCursor
|
from MySQLdb.cursors import DictCursor
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ArticlespiderPipeline(object):
|
class ArticlespiderPipeline(object):
|
||||||
def process_item(self, item, spider):
|
def process_item(self, item, spider):
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
|
||||||
class MysqlPipeline(object):
|
class MysqlPipeline(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'qweasdzxc227', 'article_spider', charset="utf8",
|
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'qweasdzxc227', 'article_spider', charset="utf8",
|
||||||
use_unicode=True)
|
use_unicode=True)
|
||||||
self.cursor = self.conn.cursor()
|
self.cursor = self.conn.cursor()
|
||||||
|
|
||||||
def process_item(self, item, spider):
|
def process_item(self, item, spider):
|
||||||
insert_sql = """
|
insert_sql = """
|
||||||
insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date)
|
insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date)
|
||||||
values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums)
|
values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums)
|
||||||
"""
|
"""
|
||||||
params = list()
|
params = list()
|
||||||
params.append(item.get("title", ""))
|
params.append(item.get("title", ""))
|
||||||
params.append(item.get("url", ""))
|
params.append(item.get("url", ""))
|
||||||
params.append(item.get("url_object_id", ""))
|
params.append(item.get("url_object_id", ""))
|
||||||
front_image = ','.join(item.get("front_image_url", []))
|
front_image = ','.join(item.get("front_image_url", []))
|
||||||
params.append(front_image)
|
params.append(front_image)
|
||||||
params.append(item.get("front_image_path", ""))
|
params.append(item.get("front_image_path", ""))
|
||||||
params.append(item.get("parise_nums", 0))
|
params.append(item.get("parise_nums", 0))
|
||||||
params.append(item.get("comment_nums", 0))
|
params.append(item.get("comment_nums", 0))
|
||||||
params.append(item.get("fav_nums", 0))
|
params.append(item.get("fav_nums", 0))
|
||||||
params.append(item.get("tags", ""))
|
params.append(item.get("tags", ""))
|
||||||
params.append(item.get("content", ""))
|
params.append(item.get("content", ""))
|
||||||
params.append(item.get("create_date", "1970-07-01"))
|
params.append(item.get("create_date", "1970-07-01"))
|
||||||
self.cursor.execute(insert_sql, tuple(params))
|
self.cursor.execute(insert_sql, tuple(params))
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
|
||||||
class MysqlTwistedPipline(object):
|
class MysqlTwistedPipline(object):
|
||||||
def __init__(self, dbpool):
|
def __init__(self, dbpool):
|
||||||
self.dbpool = dbpool
|
self.dbpool = dbpool
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_settings(cls, settings):
|
def from_settings(cls, settings):
|
||||||
dbparms = dict(
|
dbparms = dict(
|
||||||
host=settings["MYSQL_HOST"],
|
host=settings["MYSQL_HOST"],
|
||||||
db=settings["MYSQL_DBNAME"],
|
db=settings["MYSQL_DBNAME"],
|
||||||
user=settings["MYSQL_USER"],
|
user=settings["MYSQL_USER"],
|
||||||
passwd=settings["MYSQL_PASSWORD"],
|
passwd=settings["MYSQL_PASSWORD"],
|
||||||
charset='utf8',
|
charset='utf8',
|
||||||
cursorclass=DictCursor,
|
cursorclass=DictCursor,
|
||||||
use_unicode=True,
|
use_unicode=True,
|
||||||
)
|
)
|
||||||
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
|
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
|
||||||
|
return cls(dbpool)
|
||||||
return cls(dbpool)
|
|
||||||
|
def process_item(self, item, spider):
|
||||||
def process_item(self, item, spider):
|
# 使用twisted将mysql插入变成异步执行
|
||||||
# 使用twisted将mysql插入变成异步执行
|
query = self.dbpool.runInteraction(self.do_insert, item)
|
||||||
query = self.dbpool.runInteraction(self.do_insert, item)
|
query.addErrback(self.handle_error, item, spider) # 处理异常
|
||||||
query.addErrback(self.handle_error, item, spider) # 处理异常
|
|
||||||
|
def handle_error(self, failure, item, spider):
|
||||||
def handle_error(self, failure, item, spider):
|
# 处理异步插入的异常
|
||||||
# 处理异步插入的异常
|
print(failure)
|
||||||
print(failure)
|
|
||||||
|
def do_insert(self, cursor, item):
|
||||||
def do_insert(self, cursor, item):
|
# 执行具体的插入
|
||||||
# 执行具体的插入
|
insert_sql = """
|
||||||
insert_sql = """
|
insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date)
|
||||||
insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date)
|
values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums)
|
||||||
values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums)
|
"""
|
||||||
"""
|
params = list()
|
||||||
params = list()
|
params.append(item.get("title", ""))
|
||||||
params.append(item.get("title", ""))
|
params.append(item.get("url", ""))
|
||||||
params.append(item.get("url", ""))
|
params.append(item.get("url_object_id", ""))
|
||||||
params.append(item.get("url_object_id", ""))
|
front_image = ','.join(item.get("front_image_url", []))
|
||||||
front_image = ','.join(item.get("front_image_url", []))
|
params.append(front_image)
|
||||||
params.append(front_image)
|
params.append(item.get("front_image_path", ""))
|
||||||
params.append(item.get("front_image_path", ""))
|
params.append(item.get("parise_nums", 0))
|
||||||
params.append(item.get("parise_nums", 0))
|
params.append(item.get("comment_nums", 0))
|
||||||
params.append(item.get("comment_nums", 0))
|
params.append(item.get("fav_nums", 0))
|
||||||
params.append(item.get("fav_nums", 0))
|
params.append(item.get("tags", ""))
|
||||||
params.append(item.get("tags", ""))
|
params.append(item.get("content", ""))
|
||||||
params.append(item.get("content", ""))
|
params.append(item.get("create_date", "1970-07-01"))
|
||||||
params.append(item.get("create_date", "1970-07-01"))
|
# 根据不同的item 构建不同的sql语句并插入到mysql中
|
||||||
# 根据不同的item 构建不同的sql语句并插入到mysql中
|
cursor.execute(insert_sql, tuple(params))
|
||||||
cursor.execute(insert_sql, tuple(params))
|
|
||||||
|
|
||||||
|
class JsonWithEncodingPipeline(object):
|
||||||
class JsonWithEncodingPipeline(object):
|
# 自定义json文件的导出
|
||||||
# 自定义json文件的导出
|
def __init__(self):
|
||||||
def __init__(self):
|
self.file = codecs.open('article.json', 'a', encoding="utf-8")
|
||||||
self.file = codecs.open('article.json', 'a', encoding="utf-8")
|
|
||||||
|
def process_item(self, item, spider):
|
||||||
def process_item(self, item, spider):
|
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
|
||||||
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
|
self.file.write(lines)
|
||||||
self.file.write(lines)
|
return item
|
||||||
return item
|
|
||||||
|
def spider_closed(self, spider):
|
||||||
def spider_closed(self, spider):
|
self.file.close()
|
||||||
self.file.close()
|
|
||||||
|
|
||||||
|
class JsonExporterPipeline(object):
|
||||||
class JsonExporterPipeline(object):
|
def __init__(self):
|
||||||
def __init__(self):
|
self.file = open('articleexport.json', 'wb')
|
||||||
self.file = open('articleexport.json', 'wb')
|
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
|
||||||
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
|
self.exporter.start_exporting()
|
||||||
self.exporter.start_exporting()
|
|
||||||
|
def close_spider(self, spider):
|
||||||
def close_spider(self, spider):
|
self.exporter.finish_exporting()
|
||||||
self.exporter.finish_exporting()
|
self.file.close()
|
||||||
self.file.close()
|
|
||||||
|
def process_item(self, item, spider):
|
||||||
def process_item(self, item, spider):
|
self.exporter.export_item(item)
|
||||||
self.exporter.export_item(item)
|
|
||||||
|
|
||||||
|
class ElasticsearchPipeline(object):
|
||||||
class ArticleImagePipeline(ImagesPipeline):
|
# 将数据写入到es中
|
||||||
def item_completed(self, results, item, info):
|
def process_item(self, item, spider):
|
||||||
try:
|
item.save_to_es()
|
||||||
if "front_image_url" in item:
|
return item
|
||||||
image_file_path = ''
|
|
||||||
for ok, value in results:
|
|
||||||
image_file_path = value["path"]
|
|
||||||
item["front_image_path"] = image_file_path
|
|
||||||
return item
|
|
||||||
except Exception as e:
|
|
||||||
print(e)
|
|
||||||
item['front_image_path'] = '图片不可用'
|
|
||||||
return item
|
|
||||||
|
|
||||||
|
|
||||||
class ElasticsearchPipeline(object):
|
|
||||||
# 将数据写入到es中
|
|
||||||
def process_item(self, item, spider):
|
|
||||||
# article = ArticleType()
|
|
||||||
# article.title = item['title']
|
|
||||||
# article.create_date = item['create_date']
|
|
||||||
# article.content = remove_tags(item['content'])
|
|
||||||
# article.front_image_url = item['front_image_url']
|
|
||||||
# if 'front_image_path' in item:
|
|
||||||
# article.front_image_path = item['front_image_path']
|
|
||||||
# article.praise_nums = item['praise_nums']
|
|
||||||
# article.fav_nums = item['fav_nums']
|
|
||||||
# article.comment_nums = item['comment_nums']
|
|
||||||
# article.url = item['url']
|
|
||||||
# article.tags = item['tags']
|
|
||||||
# article.meta.id = item['url_object_id']
|
|
||||||
# article.save()
|
|
||||||
# 将item转换为es的数据
|
|
||||||
item.save_to_es()
|
|
||||||
return item
|
|
||||||
|
Loading…
Reference in new issue