初步完成服务器向多个客户端递交爬虫请求,客户端返回爬虫结果到全局外部变量

master
wufayuan 3 years ago
parent 64a607e50b
commit a1a73aa412

@ -1,5 +1,7 @@
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
import json
import struct import struct
import threading
from threading import Thread from threading import Thread
import socket import socket
from json import JSONEncoder, JSONDecoder from json import JSONEncoder, JSONDecoder
@ -152,11 +154,35 @@ class Client(Thread):
self.end() self.end()
def response():
socket_to_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
socket_to_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket_to_server.bind(('', 7798))
socket_to_server.listen()
while True:
server_socket, _ = socket_to_server.accept()
request_header_size = struct.unpack("!Q", read_bytes(server_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(server_socket, request_header_size).decode("utf-8"))
print(request_map)
response_map = {'response': 'I am replying...'}
server_socket.sendall(generate_response(response_map))
def generate_response(response_map):
response_binary = json.JSONEncoder().encode(response_map).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
download_task = Client(ip, port) download_task = Client(ip, port)
download_task.daemon = True download_task.daemon = True
download_task.start() # download_task.start()
download_task.join() # download_task.join()
server = Server(7777) server = Server(7777)
server.daemon = True server.daemon = True
server.start() server.start()
response()
server.join() server.join()

@ -0,0 +1,188 @@
# -*- coding: UTF-8 -*-
import json
import struct
import threading
from threading import Thread
import socket
from json import JSONEncoder, JSONDecoder
import sys
from dcs.tests.server import Server
# -------------------------------配置--------------------------------------------
# ------------------------------config--------------------------------------------
if len(sys.argv) < 2:
ip = "127.0.0.1" # server的ip
else:
ip = sys.argv[1]
port = 7777 # server的port
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
"""
从socket读取size个字节
:param s:套接字
:param size:要读取的大小
:return:读取的字节数在遇到套接字关闭的情况下返回的数据的长度可能小于 size
"""
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Client(Thread):
def __init__(self, server_ip: str, server_port: int) -> None:
"""
:param server_ip: 服务器IP
:param server_port: 服务器端口
"""
super().__init__()
self.ip = server_ip
self.port = server_port
self.cookie = None
def test(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'test'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['test']
def crawling(self, word: str, pages_start: int, pages_end: int):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'crawl zhiwang'
request['word'] = word
request['pages_start'] = str(pages_start)
request['pages_end'] = str(pages_end)
request['cookie'] = '2b0fd361bbf0b986fbc20d989a224d66fe9cb13a'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['crawl zhiwang']
def report_status(self, status: str):
# status: free or busy
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'report_' + status
request['spider_info'] = (ip, port)
request['cookie'] = self.cookie
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['report_' + status]
def end(self):
"""
结束通信
:return:
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'end'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
print("end communication!")
def login(self, user, password):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'login'
request['user'] = user
request['password'] = password
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
if responseJson['login'] not in ['用户名错误,登录失败', '密码错误,登录失败']:
self.cookie = responseJson['login']
return responseJson['login']
def run(self) -> None:
print(self.login('2', '2'))
print(self.report_status('free'))
print(self.crawling(input("word:"), pages_start=1, pages_end=4)) # [3,4)
self.end()
def response():
socket_to_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
socket_to_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
socket_to_server.bind(('', 7799))
socket_to_server.listen()
while True:
server_socket, _ = socket_to_server.accept()
request_header_size = struct.unpack("!Q", read_bytes(server_socket, 8))[0]
request_map = json.JSONDecoder().decode(read_bytes(server_socket, request_header_size).decode("utf-8"))
print(request_map)
response_map = {'response1': 'I am replying...'}
server_socket.sendall(generate_response(response_map))
def generate_response(response_map):
response_binary = json.JSONEncoder().encode(response_map).encode("utf-8")
response_binary_len = len(response_binary)
response_binary_len_binary = struct.pack("!Q", response_binary_len)
response_binary = response_binary_len_binary + response_binary
return response_binary
download_task = Client(ip, port)
download_task.daemon = True
# download_task.start()
# download_task.join()
# server = Server(7777)
# server.daemon = True
# server.start()
response()
# server.join()

@ -1,4 +1,5 @@
import socket import socket
from collections import deque
class CUI: class CUI:
@ -9,6 +10,7 @@ class CUI:
self.state = state self.state = state
self.cookie = cookie self.cookie = cookie
self.socket = st self.socket = st
self.crawl_result = deque()
class global_var: class global_var:

@ -0,0 +1,72 @@
import socket
import threading
import struct
from json import JSONEncoder, JSONDecoder
from time import sleep
from loguru import logger
from collections import deque
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Requester(threading.Thread):
def __init__(self, request_map: 'dict', client_address: tuple):
super().__init__()
self.request_map = request_map
self.daemon = True
self.client_address = client_address
self.responseJson = None
pass
def request(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_client:
socket_to_client.connect(self.client_address)
socket_to_client.sendall(generate_request(self.request_map))
self.responseJson = JSONDecoder().decode(
read_bytes(socket_to_client, struct.unpack('!Q', socket_to_client.recv(8))[0]).decode(
"utf-8"))
res.append(self.responseJson)
return res
def get(self):
try:
return self.request()
except:
return None
def run(self) -> None:
self.request()
if __name__ == '__main__':
address = ('127.0.0.1', 7798)
address1 = ('127.0.0.1', 7799)
my_request = {'request': 'I am asking you...'}
my_request1 = {'request1': 'I am asking you...'}
res = deque()
requester = Requester(my_request, address)
requester1 = Requester(my_request1, address1)
requester1.start()
requester.start()
requester1.join()
requester.join()
sleep(2)
# print(requester1.get())
# print(requester.get())
print(res)
Loading…
Cancel
Save