重构了整个项目,使之更符合“多个系统-相互协同”的模型,同时,实现了爬虫任务系统及其分发与远程和本地结果组合。多系统采用轮询的方式,一旦接受任务就开启对应的执行线程,整个系统得以真正实现多用户同时访问。此外,完善了cookie机制,包括用户认证与识别,完善了从数据库中提取数据并组合,完善了多系统协调机制,初步实现了客户端的初始代码等等

master V1.0
wufayuan 3 years ago
parent a1a73aa412
commit b1a90b646c

2
.gitignore vendored

@ -1,2 +1,2 @@
!/dcs/tests/zhiwang.py !/dcs/tests/zhiwang.py
!/dcs/tests/cookie.py !/dcs/tools/cookie.py

@ -2,6 +2,12 @@ import socket
from collections import deque from collections import deque
# from dcs.tests.spider import Spider
# from dcs.tests.requester import Requester
# from dcs.tests.user_process import UP
# from dcs.tests.response import Responser
class CUI: class CUI:
def __init__(self, user_name, login_time, login_state, state, cookie, st=None): def __init__(self, user_name, login_time, login_state, state, cookie, st=None):
self.user_name = user_name self.user_name = user_name
@ -14,9 +20,15 @@ class CUI:
class global_var: class global_var:
"""需要定义全局变量的放在这里,最好定义一个初始值""" """需要定义全局变量的放在这里"""
free_spiders = [] free_spiders = []
current_user_info: list[CUI] = [] current_user_info: list[CUI] = []
requester = None
server = None
spider = None
up = None
communicator = None
server_socket = None
def get_free_sockets() -> tuple[socket.socket]: def get_free_sockets() -> tuple[socket.socket]:
@ -38,19 +50,35 @@ def add_user(user_name, login_time, login_state, state, cookie, st=None):
global_var.current_user_info.append(CUI(user_name, login_time, login_state, state, cookie, st)) global_var.current_user_info.append(CUI(user_name, login_time, login_state, state, cookie, st))
def set_state_socket(cookie, state): def set_state_socket(sockett, state):
for i in global_var.current_user_info: for i in global_var.current_user_info:
if i.cookie == cookie: if i.socket == sockett:
i.state = state i.state = state
break break
def set_crawl_result(cookie, result):
for i in global_var.current_user_info:
if i.cookie == cookie:
i.crawl_result.append(result)
# print(f'hhh: {cookie}', result)
break
def get_crawl_result(cookie):
for i in global_var.current_user_info:
if i.cookie == cookie:
# print(f'hgf: {cookie}', i.crawl_result)
return i.crawl_result
def get_by_cookie(cookie): def get_by_cookie(cookie):
for i in global_var.current_user_info: for i in global_var.current_user_info:
if i.cookie == cookie: if i.cookie == cookie:
return i return i
return None return None
def delete_user(cookie): def delete_user(cookie):
i = global_var.get_by_cookie(cookie) i = get_by_cookie(cookie)
global_var.current_user_info.remove(i) global_var.current_user_info.remove(i)

@ -1,5 +1,5 @@
[server] [server]
port = 7777 port = 7777
daemon = True daemon = True
buffer_size = 8 * 1024 * 1024

@ -0,0 +1,79 @@
import json
import socket
import struct
from json import JSONDecoder
import threading
from dcs.tools import message_process as mp
from dcs.tools.message_process import parse_request, generate_response
class Crawl(threading.Thread):
def __init__(self):
super(Crawl, self).__init__()
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('', 9000))
@staticmethod
def crawl() -> dict:
result_map = {0: {'name': 'remote', 'college': 'remote', 'major': 'remote', 'paper': 'remote'}}
return result_map
def run(self) -> None:
self.server_socket.listen()
while True:
client_socket, _ = self.server_socket.accept()
request_map = parse_request(client_socket)
if request_map['type'] == 'request':
print("receiving help request:\n"+json.dumps(request_map, ensure_ascii=False))
response_map = self.crawl()
response_map.update({'cookie': request_map['cookie']})
client_socket.sendall(generate_response(response_map))
if request_map['type'] == 'response':
print("receiving response:\n" + json.dumps(request_map, ensure_ascii=False))
# break
crawl = Crawl()
crawl.start()
# crawl.join()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.bind(('127.0.0.1', 9000))
socket_to_server.connect(('127.0.0.1', 7777))
request = {'action': 'register', 'user': 'wufayuan', 'password': '113818'}
socket_to_server.sendall(mp.generate_request(request))
responseJson = JSONDecoder().decode(
mp.read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
print(responseJson)
request = {'action': 'login', 'user': 'wufayuan', 'password': '113818'}
socket_to_server.sendall(mp.generate_request(request))
responseJson = JSONDecoder().decode(
mp.read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
cookie = responseJson['cookie']
print(responseJson)
request = {'action': 'report_free', 'cookie': cookie}
socket_to_server.sendall(mp.generate_request(request))
responseJson = JSONDecoder().decode(
mp.read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
print(responseJson)
request = {'action': 'crawl zhiwang', 'word': 'computer', 'pages_start': 1, 'pages_end': 10,
'cookie': cookie}
socket_to_server.sendall(mp.generate_request(request))
responseJson = JSONDecoder().decode(
mp.read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
print(responseJson)
request = {'action': 'end'}
socket_to_server.sendall(mp.generate_request(request))
crawl.join()

@ -0,0 +1,35 @@
import threading
import socket
from loguru import logger
from dcs.tools.message_process import generate_response, generate_request
class Communicator(threading.Thread):
def __init__(self):
super(Communicator, self).__init__()
self.responser_list: list[tuple[str, socket.socket, dict]] = []
self.info_list: list[tuple[tuple, dict]] = []
def add_response(self, response_type: str, client_socket: socket.socket, response_map: dict):
self.responser_list.append((response_type, client_socket, response_map))
def add_info(self, info_type: str, address: tuple, info_map: dict):
info_map.update({'type': info_type})
self.info_list.append((address, info_map))
def run(self) -> None:
while True:
for responser in self.responser_list:
response_type, client_socket, response_map = responser[0], responser[1], responser[2]
logger.info(f'sending response to {client_socket.getpeername()}: {response_map}')
client_socket.sendall(generate_response(response_map))
self.responser_list.remove(responser)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_client:
for info in self.info_list:
socket_to_client.connect(info[0])
logger.info(f'sending response to {info[0]}: {info[1]}')
socket_to_client.sendall(generate_request(info[1]))
self.info_list.remove(info)

File diff suppressed because it is too large Load Diff

@ -1,22 +1,52 @@
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
from dcs.tests.server import Server from dcs.server import Server
from configparser import ConfigParser from configparser import ConfigParser
from loguru import logger from loguru import logger
from dcs.tests.database import create_user_info from dcs.tools.database import create_user_info
from dcs.requester import Requester
from dcs.spider import Spider
from conf.config import global_var
from dcs.user_process import UP
from dcs.communicate import Communicator
create_user_info() create_user_info()
logger.add('./dcs.log', rotation='10 MB', enqueue=True, backtrace=True, diagnose=True) logger.add('./dcs.log', rotation='10 MB', enqueue=True, backtrace=True, diagnose=True)
logger.info('reading config args...') logger.debug('reading config args...')
configFile = '../conf/settings.ini' configFile = '../conf/settings.ini'
con = ConfigParser() con = ConfigParser()
con.read(configFile, encoding='utf-8') con.read(configFile, encoding='utf-8')
items = con.items('server') items = con.items('server')
items = dict(items) items = dict(items)
logger.info('starting the server...') logger.debug('starting the main server...')
server = Server(int(items['port'])) global_var.server = Server(int(items['port']), eval(items['buffer_size']))
server.daemon = items['daemon'] global_var.server.daemon = items['daemon']
server.start() global_var.server.start()
server.join()
logger.debug('starting the requester server...')
global_var.requester = Requester()
global_var.requester.daemon = True
global_var.requester.start()
logger.debug('starting the spider server...')
global_var.spider = Spider()
global_var.spider.daemon = True
global_var.spider.start()
logger.debug('starting the user server...')
global_var.up = UP()
global_var.up.daemon = True
global_var.up.start()
logger.debug('starting the communicator server...')
global_var.communicator = Communicator()
global_var.communicator.daemon = True
global_var.communicator.start()
global_var.server.join()
global_var.requester.join()
global_var.spider.join()
global_var.up.join()
global_var.communicator.join()
logger.warning('Overing...') logger.warning('Overing...')

@ -0,0 +1,92 @@
import socket
import struct
import threading
from collections import deque
from json import JSONEncoder, JSONDecoder
from time import sleep
from loguru import logger
from conf.config import set_crawl_result
from dcs.tests.spider_task import Spider_partial_task
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Requester(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True
self.reqs = []
pass
def is_remote_task_complete(self, client_address, request_map):
pass
# for req in self.reqs:
# if req[0].client_address == client_address and req[0].request_map == request_map:
# print(req[1])
# return req[1]
def get(self, client_address, task: Spider_partial_task):
logger.info(f'sending crawl request to {str(client_address)}')
req = Req(client_address, task)
self.reqs.append(req)
req.start()
class Req(threading.Thread):
def __init__(self, client_address, task: Spider_partial_task):
super(Req, self).__init__()
self.client_address = client_address
self.task: Spider_partial_task = task
self.request_map = task.request_map
self.responseJson = None
def run(self) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_client:
socket_to_client.connect(self.client_address)
self.request_map.update({'type': 'request'})
socket_to_client.sendall(generate_request(self.request_map))
self.responseJson = JSONDecoder().decode(
read_bytes(socket_to_client, struct.unpack('!Q', socket_to_client.recv(8))[0]).decode(
"utf-8"))
# res.append(self.responseJson)
# sleep(10)
cookie = self.responseJson['cookie']
del self.responseJson['cookie']
logger.debug('receiving remote task result, saving...')
set_crawl_result(cookie, self.responseJson)
self.task.pages_start = self.task.pages_end # finished
self.task.thread = None
logger.debug("result: "+str(self.responseJson))
# global_var.requester.set_req_state(self.client_address, self.request_map, True)
if __name__ == '__main__':
address = ('127.0.0.1', 7798)
address1 = ('127.0.0.1', 7799)
my_request = {'request': 'I am asking you...'}
my_request1 = {'request1': 'I am asking you...'}
res = deque()
requester = Requester()
requester.start()
# requester.get(address, my_request)
# requester.get(address1, my_request1)
sleep(2)
# print(requester1.get())
# print(requester.get())
print(res)

@ -0,0 +1,36 @@
import threading
import socket
from loguru import logger
from dcs.tests.requestHandler import RequestHandler
from conf.config import global_var
class Server(threading.Thread): # 将监听和处理分离, 以便同时响应多个客户端
def __init__(self, port: 'int', buffer_size: 'int'):
super().__init__()
self.port: 'int' = port
self.buffer_size = buffer_size
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('', port))
global_var.server_socket = self.server_socket
self.client_sockets: list[socket.socket] = []
def run(self) -> None:
self.server_socket.listen()
while True:
client_socket, _ = self.server_socket.accept()
logger.debug(f'connected to client {client_socket.getpeername()}')
self.client_sockets.append(client_socket)
# request_header_size = struct.unpack("!Q", read_bytes(client_socket, 8))[0]
# request_map = json.JSONDecoder().decode(read_bytes(client_socket, request_header_size).decode("utf-8"))
# end请求要在主线程处理不然退出就不会及时响应
# if request_map['action'] == 'end':
# logger.info(f"[REQUEST] end")
# logger.warning("communication over!")
# break
r = RequestHandler(client_socket)
r.start()
# self.server_socket.close()

@ -0,0 +1,44 @@
import socket
import threading
from dcs.tests.spider_task import Spider_task
class Spider(threading.Thread):
def __init__(self):
super(Spider, self).__init__()
# self.tasks: list[tuple[socket.socket, dict]] = []
self.tasks: list[Spider_task] = []
self.daemon = True
def add_task(self, request_map: dict, client_socket: socket.socket):
# distribute tasks, 5 pages as a task
# [pages_start, pages_end), like [1,3) means 1,2 page
# pages_start = request_map['pages_start']
# pages_end = request_map['pages_end']
# while pages_start < pages_end:
# tmp = request_map.copy()
# tmp['pages_start'] = pages_start
# if pages_start + const_page <= pages_end:
# pages_start += const_page
# else:
# pages_start = pages_end
# tmp['pages_end'] = pages_start
# self.tasks.append((client_socket, tmp))
self.tasks.append(Spider_task(client_socket, request_map))
def run(self) -> None:
while True:
# free_remote_nodes = get_free_sockets()
# for task in self.tasks:
# logger.info(f'processing crawl task...')
# for f_node in free_remote_nodes:
# address = f_node.getpeername()
# global_var.requester.get(address, task[1])
# global_var.responser.add_response('crawling state', task[0], {'crawling state': 'starting, please wait...'})
# spider_task = Spider_task(task[0], task[1])
# spider_task.start()
# self.tasks.remove(task)
for task in self.tasks:
task.start()
self.tasks.remove(task)
pass

@ -1,188 +0,0 @@
# -*- coding: UTF-8 -*-
import json
import struct
import threading
from threading import Thread
import socket
from json import JSONEncoder, JSONDecoder
import sys
from dcs.tests.server import Server
# -------------------------------配置--------------------------------------------
# ------------------------------config--------------------------------------------
if len(sys.argv) < 2:
ip = "127.0.0.1" # server的ip
else:
ip = sys.argv[1]
port = 7777 # server的port
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
"""
从socket读取size个字节
:param s:套接字
:param size:要读取的大小
:return:读取的字节数在遇到套接字关闭的情况下返回的数据的长度可能小于 size
"""
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Client(Thread):
def __init__(self, server_ip: str, server_port: int) -> None:
"""
:param server_ip: 服务器IP
:param server_port: 服务器端口
"""
super().__init__()
self.ip = server_ip
self.port = server_port
self.cookie = None
def test(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'test'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['test']
def crawling(self, word: str, pages_start: int, pages_end: int):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'crawl zhiwang'
request['word'] = word
request['pages_start'] = str(pages_start)
request['pages_end'] = str(pages_end)
request['cookie'] = '2b0fd361bbf0b986fbc20d989a224d66fe9cb13a'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['crawl zhiwang']
def report_status(self, status: str):
# status: free or busy
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'report_' + status
request['spider_info'] = (ip, port)
request['cookie'] = self.cookie
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['report_' + status]
def end(self):
"""
结束通信
:return:
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'end'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
print("end communication!")
def login(self, user, password):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'login'
request['user'] = user
request['password'] = password
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
if responseJson['login'] not in ['用户名错误,登录失败', '密码错误,登录失败']:
self.cookie = responseJson['login']
return responseJson['login']
def run(self) -> None:
print(self.login('2', '2'))
print(self.report_status('free'))
print(self.crawling(input("word:"), pages_start=1, pages_end=4)) # [3,4)
self.end()
def response():
socket_to_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
socket_to_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket_to_server.bind(('', 7798))
socket_to_server.listen()
while True:
server_socket, _ = socket_to_server.accept()
request_header_size = struct.unpack("!Q", read_bytes(server_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(server_socket, request_header_size).decode("utf-8"))
print(request_map)
response_map = {'response': 'I am replying...'}
server_socket.sendall(generate_response(response_map))
def generate_response(response_map):
response_binary = json.JSONEncoder().encode(response_map).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
download_task = Client(ip, port)
download_task.daemon = True
# download_task.start()
# download_task.join()
server = Server(7777)
server.daemon = True
server.start()
response()
server.join()

@ -1,188 +0,0 @@
# -*- coding: UTF-8 -*-
import json
import struct
import threading
from threading import Thread
import socket
from json import JSONEncoder, JSONDecoder
import sys
from dcs.tests.server import Server
# -------------------------------配置--------------------------------------------
# ------------------------------config--------------------------------------------
if len(sys.argv) < 2:
ip = "127.0.0.1" # server的ip
else:
ip = sys.argv[1]
port = 7777 # server的port
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
"""
从socket读取size个字节
:param s:套接字
:param size:要读取的大小
:return:读取的字节数在遇到套接字关闭的情况下返回的数据的长度可能小于 size
"""
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Client(Thread):
def __init__(self, server_ip: str, server_port: int) -> None:
"""
:param server_ip: 服务器IP
:param server_port: 服务器端口
"""
super().__init__()
self.ip = server_ip
self.port = server_port
self.cookie = None
def test(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'test'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['test']
def crawling(self, word: str, pages_start: int, pages_end: int):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'crawl zhiwang'
request['word'] = word
request['pages_start'] = str(pages_start)
request['pages_end'] = str(pages_end)
request['cookie'] = '2b0fd361bbf0b986fbc20d989a224d66fe9cb13a'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['crawl zhiwang']
def report_status(self, status: str):
# status: free or busy
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'report_' + status
request['spider_info'] = (ip, port)
request['cookie'] = self.cookie
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['report_' + status]
def end(self):
"""
结束通信
:return:
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'end'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
print("end communication!")
def login(self, user, password):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'login'
request['user'] = user
request['password'] = password
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
if responseJson['login'] not in ['用户名错误,登录失败', '密码错误,登录失败']:
self.cookie = responseJson['login']
return responseJson['login']
def run(self) -> None:
print(self.login('2', '2'))
print(self.report_status('free'))
print(self.crawling(input("word:"), pages_start=1, pages_end=4)) # [3,4)
self.end()
def response():
socket_to_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
socket_to_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket_to_server.bind(('', 7799))
socket_to_server.listen()
while True:
server_socket, _ = socket_to_server.accept()
request_header_size = struct.unpack("!Q", read_bytes(server_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(server_socket, request_header_size).decode("utf-8"))
print(request_map)
response_map = {'response1': 'I am replying...'}
server_socket.sendall(generate_response(response_map))
def generate_response(response_map):
response_binary = json.JSONEncoder().encode(response_map).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
download_task = Client(ip, port)
download_task.daemon = True
# download_task.start()
# download_task.join()
# server = Server(7777)
# server.daemon = True
# server.start()
response()
# server.join()

@ -1,43 +0,0 @@
import os
import requests
from bs4 import BeautifulSoup
#爬虫头数据
cookies = {
'SINAGLOBAL': '6797875236621.702.1603159218040',
'SUB': '_2AkMXbqMSf8NxqwJRmfkTzmnhboh1ygvEieKhMlLJJRMxHRl-yT9jqmg8tRB6PO6N_Rc_2FhPeZF2iThYO9DfkLUGpv4V',
'SUBP': '0033WrSXqPxfM72-Ws9jqgMF55529P9D9Wh-nU-QNDs1Fu27p6nmwwiJ',
'_s_tentry': 'www.baidu.com',
'UOR': 'www.hfut.edu.cn,widget.weibo.com,www.baidu.com',
'Apache': '7782025452543.054.1635925669528',
'ULV': '1635925669554:15:1:1:7782025452543.054.1635925669528:1627316870256',
}
headers = {
'Connection': 'keep-alive',
'Cache-Control': 'max-age=0',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36 SLBrowser/7.0.0.6241 SLBChan/25',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-User': '?1',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'zh-CN,zh;q=0.9',
}
params = (
('cate', 'realtimehot'),
)
#数据存储
fo = open("./微博热搜.txt",'a',encoding="utf-8")
#获取网页
response = requests.get('https://s.weibo.com/top/summary', headers=headers, params=params, cookies=cookies)
#解析网页
response.encoding='utf-8'
soup = BeautifulSoup(response.text, 'html.parser')
#爬取内容
content="#pl_top_realtimehot > table > tbody > tr > td.td-02 > a"
#清洗数据
a=soup.select(content)
for i in range(0,len(a)):
a[i] = a[i].text
fo.write(a[i]+'\n')
fo.close()

@ -1,75 +1,32 @@
import socket import socket
import threading import threading
import json
import struct
from loguru import logger
from dcs.tests.spider import Spider
from dcs.tests.user_request_handler import Urh
def generate_response(response): from loguru import logger
response_binary = json.JSONEncoder().encode(response).encode("utf-8") from dcs.tools.message_process import parse_request, check
response_binary_len = len(response_binary) from conf.config import global_var
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
class RequestHandler(threading.Thread): class RequestHandler(threading.Thread):
def __init__(self, file_server, client_socket: 'socket.socket', request_map: 'dict'): def __init__(self, client_socket: 'socket.socket'):
super().__init__() super().__init__()
self.file_server = file_server
self.client_socket = client_socket self.client_socket = client_socket
self.request_map = request_map
self.daemon = True
pass
def test(self):
logger.info(f"[REQUEST] test")
response = {
'test': 'hello TEST'
}
self.client_socket.sendall(generate_response(response))
logger.info(f"[RESPONSE] test: {response['test']}")
def translate(self):
logger.info(f"[REQUEST] translate")
spider = Spider(self.request_map['word'])
response = {
'translate': spider.run()
}
self.client_socket.sendall(generate_response(response))
logger.info(f"[RESPONSE] translate: {response['translate']}")
def crawl_zhiwang(self):
logger.info(f"[REQUEST] crawl zhiwang")
try:
pages_start = int(self.request_map['pages_start'])
pages_end = int(self.request_map['pages_end'])
except:
pages_start = 1
pages_end = 1
spider = Spider(self.request_map['word'], pages_start=pages_start, pages_end=pages_end)
spider.run()
response = {
'crawl zhiwang': 'success' # TODO
}
self.client_socket.sendall(generate_response(response))
logger.info(f"[RESPONSE] crawl zhiwang: {response['crawl zhiwang']}")
def run(self) -> None: def run(self) -> None:
try: try:
if self.request_map['action'] == 'test': while True:
self.test() request_map = parse_request(self.client_socket)
self.client_socket.close()
elif self.request_map['action'] == 'translate': if request_map['action'] == 'end':
self.translate() logger.info(f"[REQUEST] end")
elif self.request_map['action'] == 'crawl zhiwang': logger.debug(f"communication over from {self.client_socket.getpeername()}!")
self.crawl_zhiwang() break
self.client_socket.close() elif request_map['action'] == 'crawl zhiwang':
elif self.request_map['action'] in ['report_free', 'login', 'register']: chk_res = check(request_map['cookie'])
urh = Urh(self.request_map, self.client_socket) if chk_res is None:
urh.start() logger.warning("user info error!")
except: break
pass global_var.spider.add_task(request_map, self.client_socket)
elif request_map['action'] in ['report_free', 'login', 'register']:
global_var.up.add_request(request_map, self.client_socket)
except Exception as e:
print(str(e))

@ -1,72 +0,0 @@
import socket
import threading
import struct
from json import JSONEncoder, JSONDecoder
from time import sleep
from loguru import logger
from collections import deque
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Requester(threading.Thread):
def __init__(self, request_map: 'dict', client_address: tuple):
super().__init__()
self.request_map = request_map
self.daemon = True
self.client_address = client_address
self.responseJson = None
pass
def request(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_client:
socket_to_client.connect(self.client_address)
socket_to_client.sendall(generate_request(self.request_map))
self.responseJson = JSONDecoder().decode(
read_bytes(socket_to_client, struct.unpack('!Q', socket_to_client.recv(8))[0]).decode(
"utf-8"))
res.append(self.responseJson)
return res
def get(self):
try:
return self.request()
except:
return None
def run(self) -> None:
self.request()
if __name__ == '__main__':
address = ('127.0.0.1', 7798)
address1 = ('127.0.0.1', 7799)
my_request = {'request': 'I am asking you...'}
my_request1 = {'request1': 'I am asking you...'}
res = deque()
requester = Requester(my_request, address)
requester1 = Requester(my_request1, address1)
requester1.start()
requester.start()
requester1.join()
requester.join()
sleep(2)
# print(requester1.get())
# print(requester.get())
print(res)

@ -1,41 +0,0 @@
import threading
import socket
import json
import struct
from dcs.tests.requestHandler import RequestHandler
from loguru import logger
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
class Server(threading.Thread):
def __init__(self, port: 'int'):
super().__init__()
self.port: 'int' = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(('', port))
self.buffer_size = 8 * 1024 * 1024
def run(self) -> None:
self.server_socket.listen()
while True:
client_socket, _ = self.server_socket.accept()
request_header_size = struct.unpack("!Q", read_bytes(client_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(client_socket, request_header_size).decode("utf-8"))
# end请求要在主线程处理不然退出就不会及时响应
if request_map['action'] == 'end':
logger.info(f"[REQUEST] end")
logger.warning("communication over!")
break
r = RequestHandler(self, client_socket, request_map)
r.start()
self.server_socket.close()

@ -1,158 +0,0 @@
import csv
import threading
import socket
from json import JSONEncoder, JSONDecoder
import struct
import dcs.tests.config
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
from dcs.tests.zhiwang import *
from loguru import logger
from dcs.tests.database import write_result2database, get_last_crawl_id, create_crawl_result_table
def translate(word):
url = 'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule'
data = {'i': word,
'from': 'AUTO',
'to': 'AUTO',
'smartresult': 'dict',
'client': 'fanyideskweb',
'doctype': 'json',
'version': '2.1',
'keyfrom': 'fanyi.web',
'action': 'FY_BY_REALTIME',
'typoResult': 'false'}
r = requests.post(url, data)
answer = r.json()
result = answer['translateResult'][0][0]['tgt']
return result
def crawl_zhiwang(word, pages_start=1, pages_end=1):
edge_options = EdgeOptions()
edge_options.use_chromium = True
edge_options.add_argument('headless')
driver = Edge(options=edge_options, executable_path=r'G:\Users\god\PycharmProjects\dcs\bin\msedgedriver.exe')
soup = driver_open(driver, word) # 搜索word
papers = [] # 用于保存爬取到的论文
table_name = 'wufayuan_crawl_result'
create_crawl_result_table(table_name=table_name)
last_crawl_id = get_last_crawl_id(table_name=table_name)
paper_id = 0
# 爬取第一篇
if pages_start == 1:
spider(driver, soup, papers)
pages_start += 1
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=last_crawl_id)
paper_id += 1
for pn in range(pages_start, pages_end):
content = change_page(driver, pn)
spider(driver, content, papers)
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=last_crawl_id)
paper_id += 1
driver.close()
def write2database(paper: Paper, table_name: str, last_crawl_id: int):
for author in paper.authors:
if author.name:
write_result2database([author.name, author.college, author.major, paper.title], table_name, last_crawl_id)
def write2csv(papers: list, file_name='./paper_author.csv'):
# 写入文件
f_papers_authors = open(file_name, 'w', encoding='utf-8', newline='')
writer_p_a = csv.writer(f_papers_authors) # 基于文件对象构建 csv写入对象
writer_p_a.writerow(["name", "college", "major", "paper"]) # csv文件的表头
# 读取每一篇论文
for paper in papers:
# 写入paper_author.csv文件
for author in paper.authors:
if author.name:
# print(author + " ")
writer_p_a.writerow([author.name, author.college, author.major, paper.title])
# 关闭文件
f_papers_authors.close()
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
"""
从socket读取size个字节
:param s:套接字
:param size:要读取的大小
:return:读取的字节数在遇到套接字关闭的情况下返回的数据的长度可能小于 size
"""
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Spider(threading.Thread):
def __init__(self, word: str, pages_start=1, pages_end=1):
super().__init__()
self.word = word
self.daemon = True
self.pages_start = pages_start
self.pages_end = pages_end
pass
def crawling(self, word: str, pages_start: int, pages_end: int, client_socket: socket.socket):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'crawl zhiwang'
request['word'] = word
request['pages_start'] = str(pages_start)
request['pages_end'] = str(pages_end)
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['crawl zhiwang']
def distribute_spiders(self):
free_sockets = dcs.tests.config.get_free_sockets()
qt = (self.pages_end - self.pages_start) // (len(free_sockets)+1) // len(free_sockets)
for st in range(len(free_sockets)):
request_map = {'action': 'crawl_zhiwang', 'pages_start': f'{st}', 'pages_end': f'{st+qt}'}
self.pages_start = st + qt
print(request_map)
print(st, self.crawling(self.word, st, st+qt, free_sockets[st]))
'''
r = RequestHandler(self, free_sockets[st], request_map)
r.start()
'''
def run(self) -> None:
logger.info('crawling...')
self.distribute_spiders()
crawl_zhiwang(word=self.word, pages_start=self.pages_start, pages_end=self.pages_end)

@ -0,0 +1,232 @@
import csv
import socket
import threading
from typing import Optional
from loguru import logger
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
from conf.config import global_var, get_free_sockets, get_crawl_result, get_by_cookie, set_state_socket
from dcs.tools.database import get_crawl_result_by_crawl_id, write_result2database
from dcs.tools.database import get_last_crawl_id, create_crawl_result_table
from dcs.tests.zhiwang import *
def translate(word):
url = 'http://fanyi.youdao.com/translate?smartresult=dict&smartresult=rule'
data = {'i': word,
'from': 'AUTO',
'to': 'AUTO',
'smartresult': 'dict',
'client': 'fanyideskweb',
'doctype': 'json',
'version': '2.1',
'keyfrom': 'fanyi.web',
'action': 'FY_BY_REALTIME',
'typoResult': 'false'}
r = requests.post(url, data)
answer = r.json()
result = answer['translateResult'][0][0]['tgt']
return result
def write2database(paper: Paper, table_name: str, last_crawl_id: int):
logger.info(f'writing to database: {paper.title}')
for author in paper.authors:
if author.name:
write_result2database([author.name, author.college, author.major, paper.title], table_name, last_crawl_id)
def write2csv(papers: list, file_name='./paper_author.csv'):
# 写入文件
f_papers_authors = open(file_name, 'w', encoding='utf-8', newline='')
writer_p_a = csv.writer(f_papers_authors) # 基于文件对象构建 csv写入对象
writer_p_a.writerow(["name", "college", "major", "paper"]) # csv文件的表头
# 读取每一篇论文
for paper in papers:
# 写入paper_author.csv文件
for author in paper.authors:
if author.name:
# print(author + " ")
writer_p_a.writerow([author.name, author.college, author.major, paper.title])
# 关闭文件
f_papers_authors.close()
class Crawler(threading.Thread):
def __init__(self, partial_task: 'Spider_partial_task'):
super(Crawler, self).__init__()
self.partial_task = partial_task
def crawl_zhiwang(self, user_name=None):
edge_options = EdgeOptions()
edge_options.use_chromium = True
# edge_options.add_argument('headless')
driver = Edge(options=edge_options, executable_path=r'G:\Users\god\PycharmProjects\dcs\bin\msedgedriver.exe')
soup = driver_open(driver, self.partial_task.word) # 搜索word
papers = [] # 用于保存爬取到的论文
table_name = f'{user_name}_crawl_result'
create_crawl_result_table(table_name=table_name)
last_crawl_id = get_last_crawl_id(table_name=table_name)
self.partial_task.crawl_id = last_crawl_id + 1
paper_id = 0
# 爬取第一篇
if self.partial_task.pages_start == 1:
spider(driver, soup, papers)
self.partial_task.pages_start += 1
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=last_crawl_id)
paper_id += 1
while self.partial_task.pages_start < self.partial_task.pages_end:
content = change_page(driver, self.partial_task.pages_start)
spider(driver, content, papers)
while paper_id < len(papers):
write2database(papers[paper_id], table_name=table_name, last_crawl_id=last_crawl_id)
paper_id += 1
self.partial_task.pages_start += 1
driver.close()
def test_simulation(self, user_name):
table_name = f'{user_name}_crawl_result'
create_crawl_result_table(table_name=table_name)
last_crawl_id = get_last_crawl_id(table_name=table_name)
self.partial_task.crawl_id = last_crawl_id + 1
# 模拟爬取
logger.debug('simulation crawling...')
paper = Paper('test', [Author('test', 'test', 'test')])
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
write2database(paper, table_name=table_name, last_crawl_id=last_crawl_id)
# over
self.partial_task.pages_start = self.partial_task.pages_end
def run(self) -> None:
try:
# self.crawl_zhiwang(user_name=self.partial_task.cui.user_name)
self.test_simulation(user_name=self.partial_task.cui.user_name)
except Exception as e:
print(e)
finally:
logger.info(f'partial crawl task finished: {str(self.partial_task)}')
self.partial_task.thread = None
# self.partial_task.pages_start = self.partial_task.pages_end
class Spider_partial_task:
def __init__(self, full_task: 'Spider_task', request_map: dict):
self.full_task: Spider_task = full_task
self.request_map = request_map
self.thread: 'threading.Thread|None' = None
self.word = self.request_map['word']
self.pages_start = self.request_map['pages_start']
self.pages_end = self.request_map['pages_end']
self.cui = get_by_cookie(self.request_map['cookie'])
self.task_type: Optional[str] = None
self.crawl_id = None
def is_partial_task_crawl_completely(self):
return self.pages_start == self.pages_end
def __str__(self):
return f'{self.full_task.client_socket.getpeername(), self.request_map}'
class Spider_task(threading.Thread):
def __init__(self, client_socket: socket.socket, request_map: dict):
super().__init__()
self.client_socket = client_socket
self.request_map = request_map
self.partial_tasks: list[Spider_partial_task] = []
self.const_page = 3
pass
def distribute_task(self):
# distribute tasks, 3 pages as a task
# [pages_start, pages_end), like [1,3) means 1,2 page
logger.info(f'distributing task: {self.client_socket.getpeername(), self.request_map}')
pages_start = self.request_map['pages_start']
pages_end = self.request_map['pages_end']
while pages_start < pages_end:
tmp = self.request_map.copy()
tmp['pages_start'] = pages_start
if pages_start + self.const_page < pages_end:
pages_start += self.const_page
else:
pages_start = pages_end
tmp['pages_end'] = pages_start
# self.tasks.append((client_socket, tmp))
self.partial_tasks.append(Spider_partial_task(self, tmp))
logger.debug(self.partial_tasks)
def is_all_task_crawled(self):
for task in self.partial_tasks:
# print(task.task_type, task.is_partial_task_crawl_completely())
if not task.is_partial_task_crawl_completely():
return False
return True
def compose_result(self):
logger.debug('composing task...')
result = dict()
remote_result = get_crawl_result(self.request_map['cookie'])
for result_map in remote_result:
result.update(result_map)
for task in self.partial_tasks:
# print(task.task_type)
if task.task_type == 'local':
local_result = dict()
local_result_database = get_crawl_result_by_crawl_id(
f"{get_by_cookie(task.request_map['cookie']).user_name}_crawl_result",
task.crawl_id)
initial_id = local_result_database[0][0]
for res in local_result_database:
local_result.update({res[0]-initial_id+1: {'name': res[1], 'college': res[2], 'major': res[3], 'paper': res[4]}})
logger.debug(local_result)
result.update(local_result)
global_var.communicator.add_info('response', self.client_socket.getpeername(), result)
def run(self) -> None:
global_var.communicator.add_response('crawling state', self.client_socket,
{'crawling state': 'starting, please wait...'})
self.distribute_task()
free_remote_nodes = list(get_free_sockets())
logger.debug(free_remote_nodes)
while True: # necessary otherwise for-cycle only executed once
for task in self.partial_tasks:
if task.is_partial_task_crawl_completely():
continue
else:
current_task_thread = task.thread
if current_task_thread is None:
for f_node in free_remote_nodes:
# print(free_remote_nodes)
address = f_node.getpeername()
# address = (address[0], address[1] + 1)
logger.debug('generating remote task')
task.thread = global_var.requester
task.task_type = 'remote'
global_var.requester.get(address, task)
free_remote_nodes.remove(f_node) # TODO
set_state_socket(f_node, 'busy')
break
else:
logger.debug('generating local task')
crawler = Crawler(task)
task.thread = crawler
task.task_type = 'local'
crawler.start()
if self.is_all_task_crawled():
break
# sleep(3)
self.compose_result()

@ -1,19 +1,9 @@
import socket import socket
import threading import threading
import json import conf.config as config
import struct import dcs.tools.database as database
import dcs.tests.config as config
import dcs.tests.database as database
from loguru import logger from loguru import logger
from conf.config import global_var
def generate_response(response):
response_binary = json.JSONEncoder().encode(response).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
class Urh(threading.Thread): class Urh(threading.Thread):
@ -24,11 +14,12 @@ class Urh(threading.Thread):
def report_state(self, state): def report_state(self, state):
logger.info(f"[REQUEST] report free") logger.info(f"[REQUEST] report free")
config.set_state_socket(self.request_map['cookie'], state) config.set_state_socket(self.client_socket, state)
response = { response = {
'report_free': 'success marked ' + str(self.request_map['cookie']) 'report_free': 'success marked ' + str(self.request_map['cookie'])
} }
self.client_socket.sendall(generate_response(response)) # self.client_socket.sendall(generate_response(response))
global_var.communicator.add_response('report_free', self.client_socket, response)
logger.info(f"[RESPONSE] report free: {response['report_free']}") logger.info(f"[RESPONSE] report free: {response['report_free']}")
def login(self, user, password, st): def login(self, user, password, st):
@ -36,10 +27,11 @@ class Urh(threading.Thread):
database.mysql_conn() database.mysql_conn()
response = database.login(user, password, st) response = database.login(user, password, st)
response = { response = {
'login': response 'cookie': response
} }
self.client_socket.sendall(generate_response(response)) # self.client_socket.sendall(generate_response(response))
logger.info(f"[RESPONSE] login: {response['login']}") global_var.communicator.add_response('cookie', self.client_socket, response)
logger.info(f"[RESPONSE] cookie: {response['cookie']}")
def register(self, user, password): def register(self, user, password):
logger.info(f"[REQUEST] register") logger.info(f"[REQUEST] register")
@ -48,7 +40,8 @@ class Urh(threading.Thread):
response = { response = {
'register': response 'register': response
} }
self.client_socket.sendall(generate_response(response)) # self.client_socket.sendall(generate_response(response))
global_var.communicator.add_response('register', self.client_socket, response)
logger.info(f"[RESPONSE] register: {response['register']}") logger.info(f"[RESPONSE] register: {response['register']}")
def run(self) -> None: def run(self) -> None:

@ -1,9 +1,6 @@
''' # 知网论文数据爬取
知网论文数据爬取
'''
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from dcs.tests.database import write_result2database
import time import time
import requests import requests
@ -37,8 +34,7 @@ def driver_open(driver, key_word):
driver.find_element(by=By.CSS_SELECTOR, value='#txt_SearchText').send_keys(key_word) driver.find_element(by=By.CSS_SELECTOR, value='#txt_SearchText').send_keys(key_word)
time.sleep(2) time.sleep(2)
# 点击搜索按钮 # 点击搜索按钮
driver.find_element(by=By.CSS_SELECTOR, value= driver.find_element(by=By.CSS_SELECTOR, value='body > div.wrapper.section1 > div.searchmain > div > div.input-box > input.search-btn').click()
'body > div.wrapper.section1 > div.searchmain > div > div.input-box > input.search-btn').click()
time.sleep(5) time.sleep(5)
content = driver.page_source.encode('utf-8') content = driver.page_source.encode('utf-8')
# driver.close() # driver.close()

@ -1,7 +1,7 @@
from hashlib import * from hashlib import *
import pymysql import pymysql
import dcs.tests.config as config import conf.config as config
import dcs.tests.cookie as cookie import dcs.tools.cookie as cookie
# 获取数据库连接对象 # 获取数据库连接对象
@ -136,6 +136,20 @@ def drop_table(table_name: str):
print(e) print(e)
def get_crawl_result_by_crawl_id(table_name: str, crawl_id: int):
try:
conn = mysql_conn()
cur = conn.cursor()
select_sql = f'select id, name, college, major, paper from {table_name} where crawl_id = {crawl_id}'
cur.execute(select_sql)
result = cur.fetchall()
cur.close()
conn.close()
return result
except Exception as e:
print(e)
def create_table(create_sql: str): def create_table(create_sql: str):
try: try:
conn = mysql_conn() conn = mysql_conn()

@ -0,0 +1,48 @@
import socket
import json
import struct
from json import JSONEncoder
from conf.config import exists
def parse_request(client_socket: socket.socket):
request_header_size = struct.unpack("!Q", read_bytes(client_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(client_socket, request_header_size).decode("utf-8"))
return request_map
def generate_response(response):
response_binary = json.JSONEncoder().encode(response).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def check(cookie: str):
if exists(cookie):
return cookie
return None # TODO: if user error, return None, else return cookie
pass
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes

@ -0,0 +1,22 @@
import socket
import threading
from dcs.tests.user_request_handler import Urh
from loguru import logger
class UP(threading.Thread):
def __init__(self):
super(UP, self).__init__()
self.requests: list[tuple[socket.socket, dict]] = []
def add_request(self, request_map: dict, client_socket: socket.socket):
self.requests.append((client_socket, request_map))
def run(self) -> None:
while True:
for request in self.requests:
logger.info(f'processing user request...')
urh = Urh(request[1], request[0])
urh.start()
self.requests.remove(request)
pass
Loading…
Cancel
Save