diff --git a/pipelines.py b/pipelines.py new file mode 100644 index 0000000..f4f3fc4 --- /dev/null +++ b/pipelines.py @@ -0,0 +1,90 @@ +# Define your item pipelines here +# +# Don't forget to add your pipeline to the ITEM_PIPELINES setting +# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html + + +# useful for handling different item types with a single interface +from itemadapter import ItemAdapter +import pymysql +import csv +import openpyxl +from pymysql import cursors #导入mysql的游标类 +from twisted.enterprise import adbapi #导入twisted进行数据异步存储 + + + +#把数据存成excel表格的形式 +class JdPipeline(object): + fp = None + workbook = openpyxl.Workbook() + def open_spider(self, spider): + print('开始爬虫') + self.sheet1 = self.workbook.create_sheet('sheet1') + #设置excel列的名称 price name shop + self.sheet1.cell(1, 1, value="price") + self.sheet1.cell(1, 2, value="name") + self.sheet1.cell(1, 3, value="shop") + #创建一个jdpc.csv文件 + self.fp =open('./jdpc.csv', 'w', encoding='utf-8',newline="") + + def process_item(self, item, spider): + price = item['price'] + name = item['name'] + shop = item['shop'] + count = item['count'] + self.count = int(count) + #把数据使用for-in循环写入excel表格 + for i in range(1,self.count): + self.sheet1.cell(self.count,1,value=price) + self.sheet1.cell(self.count, 2, value=name) + self.sheet1.cell(self.count, 3, value=shop) + csv_write = csv.writer(self.fp) + #把数据写入csv文件 + csv_write.writerow([name, price,shop]) + return item + + #该函数会在爬虫结束的时候执行依次 + def close_spider(self, spider): + print('结束爬虫') + #保存excel文件 + self.workbook.save('shuju1.xlsx') + self.fp.close() + +#异步把数据存入数据库 +class mysqlPipeline(object): + def __init__(self): + dbparams = { + 'host': '127.0.0.1', + 'user': 'root', + 'password': '数据库密码', + 'database': 'jd', + 'cursorclass': cursors.DictCursor # 这里需要传多一个参数:游标的一个参数给它 + } + self.adpool = adbapi.ConnectionPool('pymysql', **dbparams) + self._sql = None + + + @property + def sql(self): + + if not self._sql: + + self._sql = 'insert into jdpc(price,name,shop) values(%s,%s,%s)' + + return self._sql + + def process_item(self, item, spider): + defer = self.adpool.runInteraction(self.insert_item, item) # 需要传一个真正导入数据库操作的函数给它,不然跟同步下载一样 + defer.addErrback(self.handle_error, item, spider) # 添加一个接收错误信息的函数 + + def insert_item(self, cursor, item): + cursor.execute(self.sql, [item['price'], item['name'], item['shop']]) + + def handle_error(self, error, item, spider): + print('-' * 30) + print('Error:', error) + print('-' * 30) + + +