# -*- coding: utf-8 -*- import settings import pipelines import downloader import threading import redis import time import os # 全局设定 REDIS_HOST = settings.REDIS_HOST REDIS_PORT = settings.REDIS_PORT REDIS_PASSWORD = settings.REDIS_PASSWORD REDIS_LISTNAME = settings.REDIS_LISTNAME BASEURL = settings.BASEURL FILENAME_CSV = settings.FILENAME_CSV threadLock = threading.Lock() threadlines = 16 # 默认调用16个线程,不要超过20 flag = 1 # 判断主线程 connection_pool = redis.ConnectionPool(host = REDIS_HOST, port = REDIS_PORT, password = REDIS_PASSWORD, decode_responses = True) redisconn = redis.Redis(connection_pool = connection_pool) def getCategory(url) -> str: for urlstr in BASEURL.items(): if urlstr[1] in url: return urlstr[0] print("can't get a valid baseurl! Check your settings.BASEURL.") exit() def geturlList(baseurl) -> list: urlList = [] for i in range(1, 20, 2): # 爬取10页 url = baseurl + r"&page=" + str(i) urlList.append(url) return urlList def save2Redis(): # 自动获取 settings.py 中的url存入到redis中 for category in BASEURL.items(): for eachurl in geturlList(category[1]): redisconn.rpush(REDIS_LISTNAME, eachurl) print("Save the urls for '{}' to Redis queue has done.".format(category[0])) def isNullRedis() -> bool: # 判断redis中待处理的url为空 if redisconn.llen(REDIS_LISTNAME) == 0: return True else: return False def precheck() -> bool: # 检查redis队列情况 while redisconn.llen(REDIS_LISTNAME) == 0: print("No queue was found!\nPush some urls to the queue using default settings.\nContinue [c] or Exit [q] ?") check = str(input()) if check == 'c': save2Redis() return True elif check == 'q': print("Exit.") exit() else: print("invalid input!") return True def clearRedis(): # 用于清空Redis队列 while not isNullRedis(): redisconn.lpop(REDIS_LISTNAME) print("Redis queue has cleared.") def write2csv(category, response): # 写入csv文件 filename_csv = os.getcwd() + "\\Catalogues\\" + FILENAME_CSV.get(category) pipelines.write2csv(response, filename_csv) class milkSpider(threading.Thread): def __init__(self, name, url): threading.Thread.__init__(self) self.name = name self.url = url self.category = getCategory(url) # self.response = "" def run(self): self.response = downloader.getsource(self.url) threadLock.acquire() print("write2csv for '{}' was started.".format(self.url)) write2csv(self.category, self.response) print("{} is done.".format(self.name)) threadLock.release() def mainThread(threadlines = threadlines, flag = flag): # 线程数默认为3 try: threads = [] for index in range(1, threadlines + 1): if isNullRedis(): print("Redis queue is empty, no more threads will be started") flag = 0 break name = "Thread[" + str(index) + "]" print("{} started... {}/{}".format(name, str(index), threadlines)) url = redisconn.lpop(REDIS_LISTNAME) athread = milkSpider(name, url) athread.start() threads.append(athread) for thread in threads: thread.join() if flag == 1: mainThread(threadlines, flag) except BaseException as e: print(e) print("sth wrong in mainThread, check your Redis queue, main thread quit.") exit() if __name__ == '__main__': clearRedis() # 以下是本地测试 def print2console(response): # 输出到命令行 pipelines.print2console(response) def localtest(category): # 本地加载的源码测试 fileList = settings.getfileList(settings.FILEPATH.get(category)) page = 1 for filename in fileList: print("↓↓↓↓↓↓↓↓↓↓\npage " + str(page) + " start at " + time.ctime()) print("正在爬取第 " + str(page) + " 页: " + filename) response = pipelines.gethtml(filename, gethtml_mode = "cache") # 只用在这里设定一次就够了 write2csv(response) print("page " + str(page) + " sleep at " + time.ctime()) time.sleep(10) print("page " + str(page) + " sleep over at " + time.ctime()) page += 1