You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dcs/dcs/tests/spider_task.py

230 lines
10 KiB

import socket
import threading
from typing import Optional
import multiprocessing
from loguru import logger
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
from conf.config import global_var, get_free_sockets, get_crawl_result, get_by_cookie, set_state_socket
from dcs.tests.zhiwang import *
from dcs.tools.database import get_crawl_result_by_crawl_id, write_result2database
from dcs.tools.database import get_last_crawl_id, create_crawl_result_table
def write2database(paper: Paper, table_name: str, last_crawl_id: int):
logger.info(f'writing to database: {paper.title}')
for author in paper.authors:
if author.name:
write_result2database([author.name, author.college, author.major, paper.title], table_name, last_crawl_id)
def write2csv(papers: list, file_name='./paper_author.csv'):
# 写入文件
f_papers_authors = open(file_name, 'w', encoding='utf-8', newline='')
writer_p_a = csv.writer(f_papers_authors) # 基于文件对象构建 csv写入对象
writer_p_a.writerow(["name", "college", "major", "paper"]) # csv文件的表头
# 读取每一篇论文
for paper in papers:
# 写入paper_author.csv文件
for author in paper.authors:
if author.name:
# print(author + " ")
writer_p_a.writerow([author.name, author.college, author.major, paper.title])
# 关闭文件
f_papers_authors.close()
class Crawler(threading.Thread):
def __init__(self, partial_task: 'Spider_partial_task', last_crawl_id):
super(Crawler, self).__init__()
self.partial_task = partial_task
self.last_crawl_id = last_crawl_id
self.edge_driver_path = dict(global_var.configs.items('crawler'))['edge_driver_path']
def crawl_zhiwang(self, user_name=None):
edge_options = EdgeOptions()
edge_options.use_chromium = True
edge_options.add_argument('--headless')
No_Image_loading = {"profile.managed_default_content_settings.images": 2, 'permissions.default.stylesheet': 2}
edge_options.add_experimental_option("prefs", No_Image_loading)
driver = Edge(options=edge_options, executable_path=self.edge_driver_path)
soup = driver_open(driver, self.partial_task.word) # 搜索word
# logger.debug(self.last_crawl_id)
# logger.debug(self.partial_task.word)
papers = [] # 用于保存爬取到的论文
table_name = f'{user_name}_crawl_result'
create_crawl_result_table(table_name=table_name)
# last_crawl_id = get_last_crawl_id(table_name=table_name)
self.partial_task.crawl_id = self.last_crawl_id + 1
paper_id = 0
# 爬取第一篇
if self.partial_task.pages_start == 1:
spider(driver, soup, papers)
self.partial_task.pages_start += 1
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=self.last_crawl_id)
paper_id += 1
while self.partial_task.pages_start < self.partial_task.pages_end:
content = change_page(driver, self.partial_task.pages_start)
spider(driver, content, papers)
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=self.last_crawl_id)
paper_id += 1
self.partial_task.pages_start += 1
driver.close()
def test_simulation(self, user_name):
table_name = f'{user_name}_crawl_result'
create_crawl_result_table(table_name=table_name)
last_crawl_id = get_last_crawl_id(table_name=table_name)
self.partial_task.crawl_id = last_crawl_id + 1
# 模拟爬取
logger.debug('simulation crawling...')
paper = Paper('test', [Author('test', 'test', 'test')])
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
# over
self.partial_task.pages_start = self.partial_task.pages_end
def run(self) -> None:
try:
# self.crawl_zhiwang(user_name=self.partial_task.cui.user_name)
self.test_simulation(user_name=self.partial_task.cui.user_name)
except Exception as e:
logger.error(str(e))
logger.error(e.__traceback__.tb_frame.f_globals["__file__"]) # 发生异常所在的文件
logger.error(e.__traceback__.tb_lineno) # 发生异常所在的行数
finally:
logger.info(f'partial crawl task finished: {str(self.partial_task)}')
self.partial_task.thread = None
# self.partial_task.pages_start = self.partial_task.pages_end
class Spider_partial_task:
def __init__(self, full_task: 'Spider_task', request_map: dict):
self.full_task: Spider_task = full_task
self.request_map = request_map
self.thread: 'threading.Thread|None' = None
self.word = self.request_map['word']
self.pages_start = self.request_map['pages_start']
self.pages_end = self.request_map['pages_end']
self.cui = get_by_cookie(self.request_map['cookie'])
self.task_type: Optional[str] = None
self.crawl_id = None
def is_partial_task_crawl_completely(self):
return self.pages_start == self.pages_end
def __str__(self):
return f'{self.full_task.client_socket.getpeername(), self.request_map}'
class Spider_task(threading.Thread):
def __init__(self, client_socket: socket.socket, request_map: dict):
super().__init__()
self.table_name = f'{Spider_partial_task(self, request_map).cui.user_name}_crawl_result'
self.last_crawl_id = get_last_crawl_id(table_name=self.table_name)
self.client_socket = client_socket
self.request_map = request_map
self.partial_tasks: list[Spider_partial_task] = []
self.const_page = 1
def distribute_task(self):
# distribute tasks, 3 pages as a task
# [pages_start, pages_end), like [1,3) means 1,2 page
logger.info(f'distributing task: {self.client_socket.getpeername(), self.request_map}')
pages_start = self.request_map['pages_start']
pages_end = self.request_map['pages_end']
while pages_start < pages_end:
tmp = self.request_map.copy()
tmp['pages_start'] = pages_start
if pages_start + self.const_page < pages_end:
pages_start += self.const_page
else:
pages_start = pages_end
tmp['pages_end'] = pages_start
# self.tasks.append((client_socket, tmp))
self.partial_tasks.append(Spider_partial_task(self, tmp))
logger.debug(self.partial_tasks)
def is_all_task_crawled(self):
for task in self.partial_tasks:
# print(task.task_type, task.is_partial_task_crawl_completely())
if not task.is_partial_task_crawl_completely():
return False
return True
def compose_result(self):
logger.debug('composing task...')
result = dict()
remote_result = get_crawl_result(self.request_map['cookie'])
for result_map in remote_result:
result.update(result_map)
create_crawl_result_table(table_name=self.table_name)
for id, data in result_map.items():
write_result2database([data['name'], data['college'], data['major'], data['title']], self.table_name, self.last_crawl_id)
for task in self.partial_tasks:
# print(task.task_type)
if task.task_type == 'local':
local_result = dict()
local_result_database = get_crawl_result_by_crawl_id(
f"{get_by_cookie(task.request_map['cookie']).user_name}_crawl_result",
task.crawl_id)
initial_id = local_result_database[0][0]
for res in local_result_database:
local_result.update({res[0]-initial_id+1: {'name': res[1], 'college': res[2], 'major': res[3], 'paper': res[4]}})
logger.debug(local_result)
result.update(local_result)
result.update({'crawl_id': self.last_crawl_id+1, 'table_name': self.table_name})
# global_var.communicator.add_info('response', self.client_socket.getpeername(), result)
global_var.communicator.add_response('response', self.client_socket, result)
def run(self) -> None:
global_var.communicator.add_response('crawling state', self.client_socket,
{'crawling state': 'starting, please wait...'})
self.distribute_task()
free_remote_nodes = list(get_free_sockets())
logger.debug(free_remote_nodes)
while True: # necessary otherwise for-cycle only executed once
for task in self.partial_tasks:
if task.is_partial_task_crawl_completely():
continue
else:
current_task_thread = task.thread
if current_task_thread is None:
for f_node in free_remote_nodes:
# print(free_remote_nodes)
address = f_node.getpeername()
# address = (address[0], address[1] + 1)
logger.debug('generating remote task')
task.thread = global_var.requester
task.task_type = 'remote'
global_var.requester.get(address, task)
free_remote_nodes.remove(f_node) # TODO
set_state_socket(f_node, 'busy')
break
else:
logger.debug('generating local task')
crawler = Crawler(task, self.last_crawl_id)
task.thread = crawler
task.task_type = 'local'
crawler.start()
if self.is_all_task_crawled():
break
# sleep(3)
self.compose_result()