基本功能初步完成,优化:取消一条一条写入数据库,任务结果爬取完毕后一次性写入数据库;优化分布式集群;优化系统参数配置,只需要在settings.ini里改就行;优化任务分发模块,若当前接收任务太多,多出的任务将处于等待状态,当分布式节点或服务器爬虫出现空闲的时候等待状态的任务才开始运行

master
wufayuan 2 years ago
parent ff11f3bfc1
commit 27899262f5

@ -47,7 +47,13 @@ def add_user(user_name, login_time, login_state, state, cookie, address=None):
global_var.current_user_info.append(CUI(user_name, login_time, login_state, state, cookie, address))
def set_state_client(cookie, state):
def set_state_client(cookie, state=None, address=None):
if address:
for i in global_var.current_user_info:
if i.address == address:
i.state = state
break
return
for i in global_var.current_user_info:
if i.cookie == cookie:
i.state = state

@ -1,9 +1,16 @@
[server]
ip = 10.129.77.113
ip = 127.0.0.1
port = 7777
daemon = True
buffer_size = 8 * 1024 * 1024
[crawler]
edge_driver_path = G:\course\yykf\dcs\bin\msedgedriver.exe
max_count_of_crawlers = 1
[database]
ip = 127.0.0.1
user = root
password = xwdjzwy5252
database = test

@ -2,14 +2,12 @@ import json
import multiprocessing
import socket
import struct
import sys
from configparser import ConfigParser
from json import JSONDecoder
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
sys.path.append(r'F:\Users\28587\dcs')
from dcs.tests.zhiwang import *
from dcs.tools import message_process as mp
from dcs.tools.message_process import parse_request, generate_response
@ -72,6 +70,7 @@ def send_request(socket2server, req):
def crawl(request_map) -> dict:
result_map = crawl_zhiwang(request_map['word'], request_map['pages_start'], request_map['pages_end'])
# sleep(10)
# result_map = {0: {'name': 'remote', 'college': 'remote', 'major': 'remote', 'title': 'remote'},
# 1: {'name': 'remote1', 'college': 'remote1', 'major': 'remote', 'title': 'remote'}}
return result_map

@ -4,12 +4,12 @@ from loguru import logger
from dcs.clients.client import Client
start = 9000
ip = '10.129.77.113'
ip = '127.0.0.1'
port = 7777
local_ip = '10.129.77.113'
local_ip = ip
local_port = None
# 开启的分布节点数量
count = 5
count = 1
if __name__ == '__main__':
clients = []

File diff suppressed because one or more lines are too long

@ -10,7 +10,6 @@ from dcs.user_process import UP
from dcs.communicate import Communicator
logger.info('[SERVER] starting the servers...')
create_user_info()
logger.add('./dcs.log', rotation='10 MB', enqueue=True, backtrace=True, diagnose=True)
logger.info('[SERVER] reading config args...')
configFile = '../conf/settings.ini'
@ -19,6 +18,7 @@ con.read(configFile, encoding='utf-8')
global_var.configs = con
items = con.items('server')
items = dict(items)
create_user_info()
global_var.server = Server(str(items['ip']), int(items['port']), eval(items['buffer_size']))
global_var.server.daemon = items['daemon']

@ -4,6 +4,7 @@ from time import sleep
from loguru import logger
from conf.config import global_var
from dcs.tests.spider_task import Spider_task
@ -12,6 +13,8 @@ class Spider(threading.Thread):
super(Spider, self).__init__()
self.tasks: list[Spider_task] = []
self.daemon = True
self.max_count_of_crawlers = int(dict(global_var.configs.items('crawler'))['max_count_of_crawlers'])
self.crawlers = 0
def add_task(self, request_map: dict, client_socket: socket.socket):
self.tasks.append(Spider_task(client_socket, request_map))

@ -1,4 +1,5 @@
import socket
import struct
import threading
from loguru import logger
@ -14,7 +15,13 @@ class RequestHandler(threading.Thread):
def run(self) -> None:
try:
while True:
request_map = parse_request(self.client_socket)
try:
request_map = parse_request(self.client_socket)
except struct.error:
break
except Exception as e:
logger.error(f'[Error] {e.__class__.__name__}: {str(e)}')
break
if request_map['action'] == 'end':
logger.info(f"[REQUEST] end: communication over from {self.client_socket.getpeername()}!")
@ -35,4 +42,4 @@ class RequestHandler(threading.Thread):
global_var.communicator.add_response('error', self.client_socket,
{request_map['action']: f"no action {request_map['action']}!"})
except Exception as e:
logger.error(str(e))
logger.error(f'[Error] {e.__class__.__name__}: {str(e)}')

@ -1,47 +1,30 @@
import csv
import random
import socket
import threading
from time import sleep
from typing import Optional
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
from conf.config import global_var, get_free_addresses, get_crawl_result, get_by_cookie, set_state_client
from dcs.tests.zhiwang import *
from dcs.tools.database import get_crawl_result_by_crawl_id, write_result2database
from dcs.tools.database import get_last_crawl_id, create_crawl_result_table
from dcs.tools.database import write_results2database
def write2database(paper: Paper, table_name: str, last_crawl_id: int):
logger.info(f'[DATABASE] writing to database: {paper.title}')
def write2results(paper: Paper, results: list):
for author in paper.authors:
if author.name:
write_result2database([author.name, author.college, author.major, paper.title], table_name, last_crawl_id)
def write2csv(papers: list, file_name='./paper_author.csv'):
# 写入文件
f_papers_authors = open(file_name, 'w', encoding='utf-8', newline='')
writer_p_a = csv.writer(f_papers_authors) # 基于文件对象构建 csv写入对象
writer_p_a.writerow(["name", "college", "major", "paper"]) # csv文件的表头
# 读取每一篇论文
for paper in papers:
# 写入paper_author.csv文件
for author in paper.authors:
if author.name:
# print(author + " ")
writer_p_a.writerow([author.name, author.college, author.major, paper.title])
# 关闭文件
f_papers_authors.close()
results.append((author.name, author.college, author.major, paper.title))
class Crawler(threading.Thread):
def __init__(self, partial_task: 'Spider_partial_task', last_crawl_id):
def __init__(self, partial_task: 'Spider_partial_task', last_crawl_id, results):
super(Crawler, self).__init__()
self.partial_task = partial_task
self.last_crawl_id = last_crawl_id
self.results = results
self.edge_driver_path = dict(global_var.configs.items('crawler'))['edge_driver_path']
def crawl_zhiwang(self, user_name=None):
@ -64,14 +47,14 @@ class Crawler(threading.Thread):
spider(driver, soup, papers)
self.partial_task.pages_start += 1
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=self.last_crawl_id)
write2results(papers[paper_id], results=self.results)
paper_id += 1
while self.partial_task.pages_start < self.partial_task.pages_end:
content = change_page(driver, self.partial_task.pages_start)
spider(driver, content, papers)
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=self.last_crawl_id)
write2results(papers[paper_id], results=self.results)
paper_id += 1
self.partial_task.pages_start += 1
driver.close()
@ -85,11 +68,12 @@ class Crawler(threading.Thread):
# 模拟爬取
logger.debug('simulation crawling...')
paper = Paper('test', [Author('test', 'test', 'test')])
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2results(paper, results=self.results)
write2results(paper, results=self.results)
write2results(paper, results=self.results)
# over
sleep(10)
self.partial_task.pages_start = self.partial_task.pages_end
def run(self) -> None:
@ -116,7 +100,11 @@ class Spider_partial_task:
self.crawl_id = None
def is_partial_task_crawl_completely(self):
return self.pages_start == self.pages_end
finished = (self.pages_start == self.pages_end)
if finished:
if self.task_type == 'local':
global_var.spider.crawlers -= 1
return finished
def __str__(self):
return f'{self.full_task.client_socket.getpeername(), self.request_map}'
@ -125,12 +113,14 @@ class Spider_partial_task:
class Spider_task(threading.Thread):
def __init__(self, client_socket: socket.socket, request_map: dict):
super().__init__()
self.free_remote_nodes = None
self.table_name = f'{Spider_partial_task(self, request_map).cui.user_name}_crawl_result'
self.last_crawl_id = get_last_crawl_id(table_name=self.table_name)
self.client_socket = client_socket
self.request_map = request_map
self.partial_tasks: list[Spider_partial_task] = []
self.const_page = 1
self.results = []
def distribute_task(self):
# distribute tasks, 3 pages as a task
@ -156,27 +146,17 @@ class Spider_task(threading.Thread):
def compose_result(self):
logger.info('[COMPOSE] composing task...')
result = dict()
logger.info(f'[RESULT] {self.results}')
remote_result = get_crawl_result(self.request_map['cookie'])
for result_map in list(remote_result):
result.update(result_map)
create_crawl_result_table(table_name=self.table_name)
for _, data in result_map.items():
write_result2database([data['name'], data['college'], data['major'], data['title']], self.table_name, self.last_crawl_id)
for task in self.partial_tasks:
if task.task_type == 'local':
local_result = dict()
local_result_database = get_crawl_result_by_crawl_id(
f"{get_by_cookie(task.request_map['cookie']).user_name}_crawl_result",
task.crawl_id)
initial_id = local_result_database[0][0]
for res in local_result_database:
local_result.update({res[0]-initial_id+1: {'name': res[1], 'college': res[2], 'major': res[3], 'paper': res[4]}})
logger.info(f'[RESULT] {local_result}')
result.update(local_result)
result.update({'crawl_id': self.last_crawl_id+1, 'table_name': self.table_name})
write2results(Paper(data['title'], [Author(data['name'], data['college'], data['major'])]), self.results)
logger.info(f'[DATABASE] writing crawl results to database...')
write_results2database(self.results, self.table_name, self.last_crawl_id)
result = {'crawl_id': self.last_crawl_id+1, 'table_name': self.table_name, 'data': self.results}
global_var.communicator.add_response('response', self.client_socket, result)
def run(self) -> None:
@ -184,31 +164,38 @@ class Spider_task(threading.Thread):
{'crawling state': 'starting, please wait...'})
self.distribute_task()
free_remote_nodes = list(get_free_addresses())
logger.info(f'[REMOTE] free nodes: {free_remote_nodes}')
while True:
self.free_remote_nodes = list(get_free_addresses())
random.shuffle(self.free_remote_nodes)
logger.info(f'[REMOTE] free nodes: {self.free_remote_nodes}')
for task in self.partial_tasks:
if task.is_partial_task_crawl_completely():
continue
else:
random.shuffle(free_remote_nodes)
current_task_thread = task.thread
if current_task_thread is None:
for f_node in free_remote_nodes:
for f_node in self.free_remote_nodes:
address = f_node
logger.info('[TASK] generating remote task')
logger.info(f'[TASK] generating remote task {task.request_map}')
task.thread = global_var.requester
task.task_type = 'remote'
global_var.requester.get(address, task)
free_remote_nodes.remove(f_node)
set_state_client(f_node, 'busy')
set_state_client('busy', address=f_node)
sleep(1)
self.free_remote_nodes.remove(f_node)
break
else:
logger.info('[TASK] generating local task')
crawler = Crawler(task, self.last_crawl_id)
task.thread = crawler
task.task_type = 'local'
crawler.start()
logger.info(f'[TASK] generating local task {task.request_map}')
if global_var.spider.crawlers >= global_var.spider.max_count_of_crawlers:
logger.warning(f'[TASK] generate failed, crawlers exceed! spider task {task.request_map} is at state waiting...')
break
else:
crawler = Crawler(task, self.last_crawl_id, self.results)
task.thread = crawler
task.task_type = 'local'
crawler.start()
global_var.spider.crawlers += 1
if self.is_all_task_crawled():
break
sleep(5) # 每5秒轮询一次
self.compose_result()

@ -1,18 +1,18 @@
import pymysql
from loguru import logger
import conf.config as config
import dcs.tools.cookie as cookie
from conf import config
from conf.config import global_var as var
# 获取数据库连接对象
def mysql_conn(host='10.129.16.173', user='root', passwd='427318Aa', db='test'):
# def mysql_conn(host='10.129.16.155', user='root', passwd='427318Aa', db='test'):
# def mysql_conn(host='192.168.43.64', user='root', passwd='427318Aa', db='test'):
# def mysql_conn(host='127.0.0.1', user='root', passwd='xwdjzwy5252', db='test'):
def mysql_conn():
database = dict(var.configs.items('database'))
try:
# logger.debug('connecting to database...')
conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db)
conn = pymysql.connect(host=database['ip'], user=database['user'], passwd=database['password'],
db=database['database'])
return conn
except Exception as e:
logger.error(f'[ERROR] {str(e)}')
@ -198,18 +198,18 @@ def create_user_info(table_name: str = 'user_info'):
create_table(create_sql)
def write_result2database(res: list, table_name: str, last_crawl_id: int):
def write_results2database(res: list, table_name: str, last_crawl_id: int):
try:
logger.debug('writing to database...')
logger.info(f'[DATABASE] writing {last_crawl_id+1}st crawl results to table {table_name} in database...')
conn = mysql_conn()
cur = conn.cursor()
insert_sql = f"insert into {table_name} (name,college,major,paper,crawl_id,time) values ('%s','%s','%s','%s',%s,now())" % (
res[0], res[1], res[2], res[3], last_crawl_id + 1)
cur.execute(insert_sql)
insert_sql = f"insert into {table_name} (name,college,major,paper,crawl_id,time) values (%s,%s,%s,%s,{last_crawl_id + 1},now())"
cur.executemany(insert_sql, res)
conn.commit()
cur.close()
conn.close()
info = '插入成功'
logger.info(f'[DATABASE] writing successful of {last_crawl_id + 1}st crawl results!')
except Exception as e:
logger.error(f'[ERROR] {str(e)}')
info = '插入失败'

@ -137,7 +137,7 @@ function execute(cmd) { //调用cmd命令
})
}
app.post('/check', function (req, res) {
execute('python connect.py --ip 127.0.0.1 --port 7777 crawling --word computer --cookie god --pages_start 1 --pages_end 5');
execute('python connect.py --ip 127.0.0.1 --port 7777 crawling --word computer --cookie god --pages_start 1 --pages_end 4');
fs.readFile('./result.json', 'utf-8', function (err, data) {
if (err) {
console.error(err);

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save