From 97316efeb88c6aeb15871d1923d331f4bb5b206d Mon Sep 17 00:00:00 2001 From: qweasdzxc227 <1095578930@qq.com> Date: Sat, 1 Jun 2024 19:08:13 +0800 Subject: [PATCH] 6.1.7.8 --- .../ArticleSpider/models/es_types.py | 87 +++-- ArticleSpider/ArticleSpider/pipelines.py | 307 ++++++++---------- .../ArticleSpider/spiders/jobbole.py | 7 +- ArticleSpider/main.py | 2 +- 4 files changed, 187 insertions(+), 216 deletions(-) diff --git a/ArticleSpider/ArticleSpider/models/es_types.py b/ArticleSpider/ArticleSpider/models/es_types.py index 28645b8..5fd480d 100644 --- a/ArticleSpider/ArticleSpider/models/es_types.py +++ b/ArticleSpider/ArticleSpider/models/es_types.py @@ -1,44 +1,43 @@ -# -*- coding: utf-8 -*- -__author__ = 'bobby' - -from datetime import datetime -from elasticsearch_dsl import DocType, Date, Nested, Boolean, \ - analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer - -from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer - -from elasticsearch_dsl.connections import connections - -connections.create_connection(hosts=["localhost"]) - - -class CustomAnalyzer(_CustomAnalyzer): - def get_analysis_definition(self): - return {} - - -ik_analyzer = CustomAnalyzer("ik_max_word", filter=["lowercase"]) - - -class ArticleType(DocType): - # 伯乐在线文章类型 - suggest = Completion(analyzer=ik_analyzer) - title = Text(analyzer="ik_max_word") - create_date = Date() - url = Keyword() - url_object_id = Keyword() - front_image_url = Keyword() - front_image_path = Keyword() - praise_nums = Integer() - comment_nums = Integer() - fav_nums = Integer() - tags = Text(analyzer="ik_max_word") - content = Text(analyzer="ik_max_word") - - class Meta: - index = "jobbole" - doc_type = "article" - - -if __name__ == "__main__": - ArticleType.init() +# -*- coding: utf-8 -*- + +from datetime import datetime +from elasticsearch_dsl import DocType, Date, Nested, Boolean, \ + analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer + +from elasticsearch_dsl.analysis import CustomAnalyzer as _CustomAnalyzer + +from elasticsearch_dsl.connections import connections + +connections.create_connection(hosts=["localhost"]) + + +class CustomAnalyzer(_CustomAnalyzer): + def get_analysis_definition(self): + return {} + + +ik_analyzer = CustomAnalyzer("ik_max_word", filter=["lowercase"]) + + +class ArticleType(DocType): + # 伯乐在线文章类型 + suggest = Completion(analyzer=ik_analyzer) + title = Text(analyzer="ik_max_word") + create_date = Date() + url = Keyword() + url_object_id = Keyword() + front_image_url = Keyword() + front_image_path = Keyword() + praise_nums = Integer() + comment_nums = Integer() + fav_nums = Integer() + tags = Text(analyzer="ik_max_word") + content = Text(analyzer="ik_max_word") + + class Meta: + index = "jobbole" + doc_type = "article" + + +if __name__ == "__main__": + ArticleType.init() diff --git a/ArticleSpider/ArticleSpider/pipelines.py b/ArticleSpider/ArticleSpider/pipelines.py index 2f41cd0..425e4d6 100644 --- a/ArticleSpider/ArticleSpider/pipelines.py +++ b/ArticleSpider/ArticleSpider/pipelines.py @@ -1,169 +1,138 @@ -# Define your item pipelines here -# -# Don't forget to add your pipeline to the ITEM_PIPELINES setting -# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html - - -# useful for handling different item types with a single interface -from itemadapter import ItemAdapter -from scrapy.pipelines.images import ImagesPipeline -from scrapy.http.request import Request -from ArticleSpider.models.es_types import ArticleType -import codecs -import json -from w3lib.html import remove_tags -from scrapy.exporters import JsonItemExporter -import MySQLdb -from twisted.enterprise import adbapi -from MySQLdb.cursors import DictCursor - - - -class ArticlespiderPipeline(object): - def process_item(self, item, spider): - return item - - -class MysqlPipeline(object): - def __init__(self): - self.conn = MySQLdb.connect('127.0.0.1', 'root', 'qweasdzxc227', 'article_spider', charset="utf8", - use_unicode=True) - self.cursor = self.conn.cursor() - - def process_item(self, item, spider): - insert_sql = """ - insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date) - values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums) - """ - params = list() - params.append(item.get("title", "")) - params.append(item.get("url", "")) - params.append(item.get("url_object_id", "")) - front_image = ','.join(item.get("front_image_url", [])) - params.append(front_image) - params.append(item.get("front_image_path", "")) - params.append(item.get("parise_nums", 0)) - params.append(item.get("comment_nums", 0)) - params.append(item.get("fav_nums", 0)) - params.append(item.get("tags", "")) - params.append(item.get("content", "")) - params.append(item.get("create_date", "1970-07-01")) - self.cursor.execute(insert_sql, tuple(params)) - self.conn.commit() - return item - - -class MysqlTwistedPipline(object): - def __init__(self, dbpool): - self.dbpool = dbpool - - @classmethod - def from_settings(cls, settings): - dbparms = dict( - host=settings["MYSQL_HOST"], - db=settings["MYSQL_DBNAME"], - user=settings["MYSQL_USER"], - passwd=settings["MYSQL_PASSWORD"], - charset='utf8', - cursorclass=DictCursor, - use_unicode=True, - ) - dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) - - return cls(dbpool) - - def process_item(self, item, spider): - # 使用twisted将mysql插入变成异步执行 - query = self.dbpool.runInteraction(self.do_insert, item) - query.addErrback(self.handle_error, item, spider) # 处理异常 - - def handle_error(self, failure, item, spider): - # 处理异步插入的异常 - print(failure) - - def do_insert(self, cursor, item): - # 执行具体的插入 - insert_sql = """ - insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date) - values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums) - """ - params = list() - params.append(item.get("title", "")) - params.append(item.get("url", "")) - params.append(item.get("url_object_id", "")) - front_image = ','.join(item.get("front_image_url", [])) - params.append(front_image) - params.append(item.get("front_image_path", "")) - params.append(item.get("parise_nums", 0)) - params.append(item.get("comment_nums", 0)) - params.append(item.get("fav_nums", 0)) - params.append(item.get("tags", "")) - params.append(item.get("content", "")) - params.append(item.get("create_date", "1970-07-01")) - # 根据不同的item 构建不同的sql语句并插入到mysql中 - cursor.execute(insert_sql, tuple(params)) - - -class JsonWithEncodingPipeline(object): - # 自定义json文件的导出 - def __init__(self): - self.file = codecs.open('article.json', 'a', encoding="utf-8") - - def process_item(self, item, spider): - lines = json.dumps(dict(item), ensure_ascii=False) + "\n" - self.file.write(lines) - return item - - def spider_closed(self, spider): - self.file.close() - - -class JsonExporterPipeline(object): - def __init__(self): - self.file = open('articleexport.json', 'wb') - self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False) - self.exporter.start_exporting() - - def close_spider(self, spider): - self.exporter.finish_exporting() - self.file.close() - - def process_item(self, item, spider): - self.exporter.export_item(item) - - -class ArticleImagePipeline(ImagesPipeline): - def item_completed(self, results, item, info): - try: - if "front_image_url" in item: - image_file_path = '' - for ok, value in results: - image_file_path = value["path"] - item["front_image_path"] = image_file_path - return item - except Exception as e: - print(e) - item['front_image_path'] = '图片不可用' - return item - - -class ElasticsearchPipeline(object): - # 将数据写入到es中 - def process_item(self, item, spider): - # article = ArticleType() - # article.title = item['title'] - # article.create_date = item['create_date'] - # article.content = remove_tags(item['content']) - # article.front_image_url = item['front_image_url'] - # if 'front_image_path' in item: - # article.front_image_path = item['front_image_path'] - # article.praise_nums = item['praise_nums'] - # article.fav_nums = item['fav_nums'] - # article.comment_nums = item['comment_nums'] - # article.url = item['url'] - # article.tags = item['tags'] - # article.meta.id = item['url_object_id'] - # article.save() - # 将item转换为es的数据 - item.save_to_es() - return item +# Define your item pipelines here +# +# Don't forget to add your pipeline to the ITEM_PIPELINES setting +# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html + + +# useful for handling different item types with a single interface +from itemadapter import ItemAdapter +from scrapy.pipelines.images import ImagesPipeline +from scrapy.http.request import Request +from ArticleSpider.models.es_types import ArticleType +import codecs +import json +from w3lib.html import remove_tags +from scrapy.exporters import JsonItemExporter +import MySQLdb +from twisted.enterprise import adbapi +from MySQLdb.cursors import DictCursor + + + +class ArticlespiderPipeline(object): + def process_item(self, item, spider): + return item + + +class MysqlPipeline(object): + def __init__(self): + self.conn = MySQLdb.connect('127.0.0.1', 'root', 'qweasdzxc227', 'article_spider', charset="utf8", + use_unicode=True) + self.cursor = self.conn.cursor() + + def process_item(self, item, spider): + insert_sql = """ + insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date) + values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums) + """ + params = list() + params.append(item.get("title", "")) + params.append(item.get("url", "")) + params.append(item.get("url_object_id", "")) + front_image = ','.join(item.get("front_image_url", [])) + params.append(front_image) + params.append(item.get("front_image_path", "")) + params.append(item.get("parise_nums", 0)) + params.append(item.get("comment_nums", 0)) + params.append(item.get("fav_nums", 0)) + params.append(item.get("tags", "")) + params.append(item.get("content", "")) + params.append(item.get("create_date", "1970-07-01")) + self.cursor.execute(insert_sql, tuple(params)) + self.conn.commit() + return item + + +class MysqlTwistedPipline(object): + def __init__(self, dbpool): + self.dbpool = dbpool + + @classmethod + def from_settings(cls, settings): + dbparms = dict( + host=settings["MYSQL_HOST"], + db=settings["MYSQL_DBNAME"], + user=settings["MYSQL_USER"], + passwd=settings["MYSQL_PASSWORD"], + charset='utf8', + cursorclass=DictCursor, + use_unicode=True, + ) + dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms) + return cls(dbpool) + + def process_item(self, item, spider): + # 使用twisted将mysql插入变成异步执行 + query = self.dbpool.runInteraction(self.do_insert, item) + query.addErrback(self.handle_error, item, spider) # 处理异常 + + def handle_error(self, failure, item, spider): + # 处理异步插入的异常 + print(failure) + + def do_insert(self, cursor, item): + # 执行具体的插入 + insert_sql = """ + insert into jobbole_article(title, url ,url_object_id, front_image_url, front_image_path, parise_nums, comment_nums, fav_nums, tags, content, create_date) + values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE parise_nums=VALUES(parise_nums) + """ + params = list() + params.append(item.get("title", "")) + params.append(item.get("url", "")) + params.append(item.get("url_object_id", "")) + front_image = ','.join(item.get("front_image_url", [])) + params.append(front_image) + params.append(item.get("front_image_path", "")) + params.append(item.get("parise_nums", 0)) + params.append(item.get("comment_nums", 0)) + params.append(item.get("fav_nums", 0)) + params.append(item.get("tags", "")) + params.append(item.get("content", "")) + params.append(item.get("create_date", "1970-07-01")) + # 根据不同的item 构建不同的sql语句并插入到mysql中 + cursor.execute(insert_sql, tuple(params)) + + +class JsonWithEncodingPipeline(object): + # 自定义json文件的导出 + def __init__(self): + self.file = codecs.open('article.json', 'a', encoding="utf-8") + + def process_item(self, item, spider): + lines = json.dumps(dict(item), ensure_ascii=False) + "\n" + self.file.write(lines) + return item + + def spider_closed(self, spider): + self.file.close() + + +class JsonExporterPipeline(object): + def __init__(self): + self.file = open('articleexport.json', 'wb') + self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False) + self.exporter.start_exporting() + + def close_spider(self, spider): + self.exporter.finish_exporting() + self.file.close() + + def process_item(self, item, spider): + self.exporter.export_item(item) + + +class ElasticsearchPipeline(object): + # 将数据写入到es中 + def process_item(self, item, spider): + item.save_to_es() + return item diff --git a/ArticleSpider/ArticleSpider/spiders/jobbole.py b/ArticleSpider/ArticleSpider/spiders/jobbole.py index fa0fcc5..56c5adb 100644 --- a/ArticleSpider/ArticleSpider/spiders/jobbole.py +++ b/ArticleSpider/ArticleSpider/spiders/jobbole.py @@ -50,6 +50,7 @@ class JobboleSpider(scrapy.Spider): cookie_dict[cookie['name']] = cookie['value'] for url in self.start_urls: yield scrapy.Request(url, dont_filter=True, cookies=cookie_dict) + def parse(self, response): # 1.获取新闻列表页中的新闻url并交给scrapy进行下载后调用相应的解析方 # 提取文章链接,extract_first()提取第一个值 @@ -57,7 +58,8 @@ class JobboleSpider(scrapy.Spider): for post_node in post_nodes: image_url = "https:" + post_node.css('.entry_summary a img::attr(src)').extract_first("") post_url = post_node.css('h2 a::attr(href)').extract_first("") - yield Request(url=parse.urljoin(response.url, post_url), meta={'front_image_url': image_url}, callback=self.parse_detail, dont_filter=True) + yield Request(url=parse.urljoin(response.url, post_url), meta={'front_image_url': image_url}, + callback=self.parse_detail, dont_filter=True) # 2.获取下一页的url并交给scrapy进行下载,下载完成后交给parse继续跟进 next_url = response.css('div.pager a:last-child::attr(href)').extract_first("") yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse) @@ -74,9 +76,10 @@ class JobboleSpider(scrapy.Spider): item_loader.add_value('url', response.url) item_loader.add_value('front_image_url', response.meta.get('front_image_url', '')) yield Request(url=parse.urljoin(response.url, "/NewsAjax/GetAjaxNewsInfo?contentId={}".format(post_id)), - meta={'article_item': item_loader, 'url':response.url}, callback=self.parse_nums) + meta={'article_item': item_loader, 'url': response.url}, callback=self.parse_nums) def parse_nums(self, response): + # 提取点赞数,收藏数,评论数 j_data = json.loads(response.text) item_loader = response.meta.get('article_item', "") item_loader.add_value('praise_nums', j_data['DiggCount']) diff --git a/ArticleSpider/main.py b/ArticleSpider/main.py index 2b6e606..cad83ed 100644 --- a/ArticleSpider/main.py +++ b/ArticleSpider/main.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -__author__ = 'bobby' + from scrapy.cmdline import execute