You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dcs/dcs/requester.py

87 lines
2.8 KiB

import socket
import struct
import threading
from collections import deque
from json import JSONEncoder, JSONDecoder
from time import sleep
from loguru import logger
from conf.config import set_crawl_result
from dcs.tests.spider_task import Spider_partial_task
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Requester(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True
self.reqs = []
pass
def is_remote_task_complete(self, client_address, request_map):
pass
def get(self, client_address, task: Spider_partial_task):
logger.info(f'sending crawl request to {str(client_address)}')
req = Req(client_address, task)
self.reqs.append(req)
req.start()
class Req(threading.Thread):
def __init__(self, client_address, task: Spider_partial_task):
super(Req, self).__init__()
self.client_address = client_address
self.task: Spider_partial_task = task
self.request_map = task.request_map
self.responseJson = None
def run(self) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_client:
socket_to_client.connect(self.client_address)
self.request_map.update({'type': 'request'})
socket_to_client.sendall(generate_request(self.request_map))
self.responseJson = JSONDecoder().decode(
read_bytes(socket_to_client, struct.unpack('!Q', socket_to_client.recv(8))[0]).decode(
"utf-8"))
cookie = self.responseJson['cookie']
del self.responseJson['cookie']
logger.debug('receiving remote task result, saving...')
set_crawl_result(cookie, self.responseJson)
self.task.pages_start = self.task.pages_end # finished
self.task.thread = None
logger.debug("result: "+str(self.responseJson))
# global_var.requester.set_req_state(self.client_address, self.request_map, True)
if __name__ == '__main__':
address = ('127.0.0.1', 7798)
address1 = ('127.0.0.1', 7799)
my_request = {'request': 'I am asking you...'}
my_request1 = {'request1': 'I am asking you...'}
res = deque()
requester = Requester()
requester.start()
# requester.get(address, my_request)
# requester.get(address1, my_request1)
sleep(2)
# print(requester1.get())
# print(requester.get())
print(res)