You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dcs/dcs/tests/client.py

96 lines
2.6 KiB

# -*- coding: UTF-8 -*-
import struct
from threading import Thread
import socket
from json import JSONEncoder, JSONDecoder
import sys
# -------------------------------配置--------------------------------------------
# ------------------------------config--------------------------------------------
if len(sys.argv) < 2:
ip = "127.0.0.1" # server的ip
else:
ip = sys.argv[1]
port = 7777 # server的port
def read_bytes(s: 'socket.socket', size: 'int') -> 'bytes':
"""
从socket读取size个字节
:param s:套接字
:param size:要读取的大小
:return:读取的字节数,在遇到套接字关闭的情况下,返回的数据的长度可能小于 size
"""
data = ''.encode('utf-8')
while len(data) < size:
rsp_data = s.recv(size - len(data))
data += rsp_data
if len(rsp_data) == 0:
break
return data
def generate_request(request) -> 'bytes':
"""
根据传入的dict生成请求
请求包含 8字节头长度+头数据
:param request: dict
:return: bytes 请求数据
"""
request_bytes = JSONEncoder().encode(request).encode("utf-8")
return struct.pack("!Q", len(request_bytes)) + request_bytes
class Client(Thread):
def __init__(self, ip: str, port: int) -> None:
"""
:param ip: 服务器IP
:param port: 服务器端口
"""
super().__init__()
self.ip = ip
self.port = port
def test(self) -> 'int':
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'test'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
responseJson = JSONDecoder().decode(
read_bytes(socket_to_server, struct.unpack('!Q', socket_to_server.recv(8))[0]).decode(
"utf-8"))
return responseJson['test']
def end(self):
"""
结束通信
:return:
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP) as socket_to_server:
socket_to_server.connect((self.ip, self.port))
request = dict()
request['action'] = 'end'
full_request = generate_request(request)
socket_to_server.sendall(full_request)
print("end communication!")
def run(self) -> None:
self.test()
self.end()
download_task = Client(ip, port)
download_task.daemon = True
download_task.start()
download_task.join()