You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
91 lines
2.9 KiB
91 lines
2.9 KiB
# Define your item pipelines here
|
|
#
|
|
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
|
|
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
|
|
|
|
|
# useful for handling different item types with a single interface
|
|
from itemadapter import ItemAdapter
|
|
import pymysql
|
|
import csv
|
|
import openpyxl
|
|
from pymysql import cursors #导入mysql的游标类
|
|
from twisted.enterprise import adbapi #导入twisted进行数据异步存储
|
|
|
|
|
|
|
|
#把数据存成excel表格的形式
|
|
class JdPipeline(object):
|
|
fp = None
|
|
workbook = openpyxl.Workbook()
|
|
def open_spider(self, spider):
|
|
print('开始爬虫')
|
|
self.sheet1 = self.workbook.create_sheet('sheet1')
|
|
#设置excel列的名称 price name shop
|
|
self.sheet1.cell(1, 1, value="price")
|
|
self.sheet1.cell(1, 2, value="name")
|
|
self.sheet1.cell(1, 3, value="shop")
|
|
#创建一个jdpc.csv文件
|
|
self.fp =open('./jdpc.csv', 'w', encoding='utf-8',newline="")
|
|
|
|
def process_item(self, item, spider):
|
|
price = item['price']
|
|
name = item['name']
|
|
shop = item['shop']
|
|
count = item['count']
|
|
self.count = int(count)
|
|
#把数据使用for-in循环写入excel表格
|
|
for i in range(1,self.count):
|
|
self.sheet1.cell(self.count,1,value=price)
|
|
self.sheet1.cell(self.count, 2, value=name)
|
|
self.sheet1.cell(self.count, 3, value=shop)
|
|
csv_write = csv.writer(self.fp)
|
|
#把数据写入csv文件
|
|
csv_write.writerow([name, price,shop])
|
|
return item
|
|
|
|
#该函数会在爬虫结束的时候执行依次
|
|
def close_spider(self, spider):
|
|
print('结束爬虫')
|
|
#保存excel文件
|
|
self.workbook.save('shuju1.xlsx')
|
|
self.fp.close()
|
|
|
|
#异步把数据存入数据库
|
|
class mysqlPipeline(object):
|
|
def __init__(self):
|
|
dbparams = {
|
|
'host': '127.0.0.1',
|
|
'user': 'root',
|
|
'password': '数据库密码',
|
|
'database': 'jd',
|
|
'cursorclass': cursors.DictCursor # 这里需要传多一个参数:游标的一个参数给它
|
|
}
|
|
self.adpool = adbapi.ConnectionPool('pymysql', **dbparams)
|
|
self._sql = None
|
|
|
|
|
|
@property
|
|
def sql(self):
|
|
|
|
if not self._sql:
|
|
|
|
self._sql = 'insert into jdpc(price,name,shop) values(%s,%s,%s)'
|
|
|
|
return self._sql
|
|
|
|
def process_item(self, item, spider):
|
|
defer = self.adpool.runInteraction(self.insert_item, item) # 需要传一个真正导入数据库操作的函数给它,不然跟同步下载一样
|
|
defer.addErrback(self.handle_error, item, spider) # 添加一个接收错误信息的函数
|
|
|
|
def insert_item(self, cursor, item):
|
|
cursor.execute(self.sql, [item['price'], item['name'], item['shop']])
|
|
|
|
def handle_error(self, error, item, spider):
|
|
print('-' * 30)
|
|
print('Error:', error)
|
|
print('-' * 30)
|
|
|
|
|
|
|