# -*- coding:UTF-8 -*- """ 负责接受url并爬取该网页 """ import threading from abc import ABC from multiprocessing import Process, Queue, Event from time import time from FakeUAGetter import my_fake_ua class GetPage: """ 获取页面基类,从_task_queue中获取任务,输出结果到_result_queue中 """ def __init__(self): self._task_queue = None self._result_queue = None class GetPageByWeb(GetPage, ABC): """ 从网页中获取页面基类 """ @classmethod def get_page_context(cls, url, timeout, *args) -> tuple: """ 用于爬取页面 爬取特定的网页 :param timeout: 爬取timeout设置,可为一个数字,或一个元组 :param url:要爬取的url :return: 返回二元组 爬取结果,网页内容 """ header = {"User-Agent": my_fake_ua.random} import requests try: page = requests.get(url, headers=header, timeout=timeout) page.encoding = 'utf-8' # fixme 临时措施 返回内容为空视为错误 # 反爬虫策略之 给你返回空白的 200结果 if page.text: result = ('success', page.text, *args) else: result = ('error', url, *args) except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.HTTPError): result = ('error', url, *args) return result class GetPageByWebWithAnotherProcessAndMultiThreading(Process, GetPageByWeb): """ 启动另一个进程,并在这个进程中使用多线程来爬取网页 将爬取任务发送到 task_queue,并在完成后将 exit_sign 设置为True 进程会在所有的任务都完成后,将 exit_sign 设置为False,并在result_queue中的item取完后退出 """ # 描述在持续几秒连接失败之后向用户展示提示信息,单位 秒 SHOW_NETWORK_DOWN_LIMIT_TIME = 3 def __init__(self, task_queue: Queue, result_queue: Queue, exit_sign: Event, network_health: Event): super().__init__() self._task_queue = task_queue self._result_queue = result_queue self._threading_pool = list() self._exit_when_task_queue_empty = exit_sign self._max_threading_number = 2 self._record_network_down_last_time = None self._network_health = network_health self._timeout = 3 def add_task(self, task): self._task_queue.put(task) def get_result(self) -> Queue: return self._result_queue def get_page_context_and_return_in_queue(self, url, *args): result = super().get_page_context(url, self._timeout, *args) if result[0] == 'success': self._max_threading_number += 1 if self._network_health.is_set(): self._record_network_down_last_time = None self._network_health.clear() else: self._max_threading_number = self._max_threading_number >> 1 if self._max_threading_number > 1 else 1 if self._max_threading_number == 1 and not self._network_health.is_set(): if self._record_network_down_last_time is None: self._record_network_down_last_time = time() elif time() - self._record_network_down_last_time > \ GetPageByWebWithAnotherProcessAndMultiThreading.SHOW_NETWORK_DOWN_LIMIT_TIME: self._network_health.set() self._result_queue.put(result) def run(self) -> None: while True: if self._task_queue.empty() and len(self._threading_pool) == 0: if self._exit_when_task_queue_empty.is_set(): self._exit_when_task_queue_empty.clear() break else: continue else: # 1 清除死去的线程 # 2 新建新的线程 for t in self._threading_pool: if not t.is_alive(): self._threading_pool.remove(t) while self._task_queue.qsize() > 0 and len(self._threading_pool) < self._max_threading_number: task = self._task_queue.get() t = threading.Thread(target=self.get_page_context_and_return_in_queue, args=(task[0], *task[1:])) self._threading_pool.append(t) t.start()