diff --git a/signaling_server.py b/signaling_server.py new file mode 100644 index 0000000..caa532a --- /dev/null +++ b/signaling_server.py @@ -0,0 +1,289 @@ +""" +信令服务器 +负责帮助客户端找到对方的连接信息(IP + 端口) +不传输实际文件,只传输元信息 +""" +import socket +import threading +import json +import time +from typing import Dict, Optional + + +class SignalingServer: + """信令服务器类""" + + def __init__(self, host: str = '0.0.0.0', port: int = 9999): + """ + 初始化信令服务器 + + Args: + host: 监听地址,默认 0.0.0.0(监听所有网卡) + port: 监听端口,默认 9999 + """ + self.host = host + self.port = port + self.server_socket: Optional[socket.socket] = None + self.running = False + + # 在线用户字典:{user_id: {"ip": "x.x.x.x", "port": xxxx, "last_seen": timestamp}} + self.online_users: Dict[str, dict] = {} + self.users_lock = threading.Lock() + + def start(self): + """启动信令服务器""" + try: + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen(5) + # 设置超时,防止 accept() 无限阻塞 + self.server_socket.settimeout(1.0) + self.running = True + + print(f"[信令服务器] 启动成功,监听 {self.host}:{self.port}") + print(f"[信令服务器] 等待客户端连接...") + + # 启动心跳检测线程(可选) + heartbeat_thread = threading.Thread(target=self._heartbeat_checker, daemon=True) + heartbeat_thread.start() + + # 主循环:接受客户端连接 + while self.running: + try: + client_socket, client_address = self.server_socket.accept() + print(f"[信令服务器] 新连接来自 {client_address}") + + # 为每个客户端创建处理线程 + client_thread = threading.Thread( + target=self._handle_client, + args=(client_socket, client_address), + daemon=True + ) + client_thread.start() + + except socket.timeout: + # 超时是正常的,继续循环以检查 running 标志 + continue + except Exception as e: + if self.running: + print(f"[信令服务器] 接受连接时出错: {e}") + + except Exception as e: + print(f"[信令服务器] 启动失败: {e}") + self.running = False + + def _handle_client(self, client_socket: socket.socket, client_address: tuple): + """ + 处理客户端请求 + + Args: + client_socket: 客户端套接字 + client_address: 客户端地址 + """ + try: + # 接收请求数据 + data = client_socket.recv(4096).decode('utf-8') + if not data: + return + + request = json.loads(data) + request_type = request.get('type') + + print(f"[信令服务器] 收到请求: {request_type} 来自 {client_address}") + + # 根据请求类型分发处理 + if request_type == 'register': + response = self._handle_register(request, client_address) + elif request_type == 'query': + response = self._handle_query(request) + elif request_type == 'unregister': + response = self._handle_unregister(request) + elif request_type == 'list': + response = self._handle_list() + else: + response = { + 'status': 'error', + 'message': f'未知的请求类型: {request_type}' + } + + # 发送响应 + client_socket.send(json.dumps(response).encode('utf-8')) + + except json.JSONDecodeError: + error_response = {'status': 'error', 'message': '无效的 JSON 格式'} + client_socket.send(json.dumps(error_response).encode('utf-8')) + except Exception as e: + print(f"[信令服务器] 处理客户端请求时出错: {e}") + error_response = {'status': 'error', 'message': str(e)} + try: + client_socket.send(json.dumps(error_response).encode('utf-8')) + except: + pass + finally: + client_socket.close() + + def _handle_register(self, request: dict, client_address: tuple) -> dict: + """ + 处理用户注册请求 + + Args: + request: 请求数据 {"type": "register", "user_id": "alice", "listen_port": 8001} + client_address: 客户端地址 + + Returns: + 响应数据 + """ + user_id = request.get('user_id') + listen_port = request.get('listen_port') + + if not user_id or not listen_port: + return {'status': 'error', 'message': '缺少 user_id 或 listen_port'} + + # 获取客户端的真实 IP + client_ip = client_address[0] + + with self.users_lock: + # 检查用户是否已经注册 + if user_id in self.online_users: + print(f"[信令服务器] 用户 {user_id} 重复注册,更新信息") + + # 注册或更新用户信息 + self.online_users[user_id] = { + 'ip': client_ip, + 'port': listen_port, + 'last_seen': time.time() + } + + print(f"[信令服务器] 用户 {user_id} 注册成功: {client_ip}:{listen_port}") + print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}") + + return { + 'status': 'success', + 'message': '注册成功', + 'user_id': user_id, + 'ip': client_ip, + 'port': listen_port + } + + def _handle_query(self, request: dict) -> dict: + """ + 处理查询对方信息的请求 + + Args: + request: 请求数据 {"type": "query", "target_user": "bob"} + + Returns: + 响应数据 + """ + target_user = request.get('target_user') + + if not target_user: + return {'status': 'error', 'message': '缺少 target_user'} + + with self.users_lock: + if target_user not in self.online_users: + return { + 'status': 'error', + 'message': f'用户 {target_user} 不在线' + } + + user_info = self.online_users[target_user] + + print(f"[信令服务器] 查询用户 {target_user}: {user_info['ip']}:{user_info['port']}") + + return { + 'status': 'success', + 'user_id': target_user, + 'ip': user_info['ip'], + 'port': user_info['port'] + } + + def _handle_unregister(self, request: dict) -> dict: + """ + 处理用户注销请求 + + Args: + request: 请求数据 {"type": "unregister", "user_id": "alice"} + + Returns: + 响应数据 + """ + user_id = request.get('user_id') + + if not user_id: + return {'status': 'error', 'message': '缺少 user_id'} + + with self.users_lock: + if user_id in self.online_users: + del self.online_users[user_id] + print(f"[信令服务器] 用户 {user_id} 已注销") + print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}") + return {'status': 'success', 'message': '注销成功'} + else: + return {'status': 'error', 'message': f'用户 {user_id} 不存在'} + + def _handle_list(self) -> dict: + """ + 处理获取在线用户列表的请求 + + Returns: + 响应数据 + """ + with self.users_lock: + users = list(self.online_users.keys()) + + print(f"[信令服务器] 返回在线用户列表: {users}") + + return { + 'status': 'success', + 'users': users, + 'count': len(users) + } + + def _heartbeat_checker(self): + """ + 心跳检测线程,定期清理超时用户(超过 5 分钟未更新) + """ + timeout = 300 # 5 分钟超时 + + while self.running: + time.sleep(60) # 每分钟检查一次 + + current_time = time.time() + with self.users_lock: + timeout_users = [ + user_id for user_id, info in self.online_users.items() + if current_time - info['last_seen'] > timeout + ] + + for user_id in timeout_users: + del self.online_users[user_id] + print(f"[信令服务器] 用户 {user_id} 超时,已移除") + + def stop(self): + """停止信令服务器""" + print("[信令服务器] 正在关闭...") + self.running = False + + if self.server_socket: + self.server_socket.close() + + print("[信令服务器] 已关闭") + + +def main(): + """ + 启动信令服务器 + """ + server = SignalingServer(host='0.0.0.0', port=9999) + + try: + server.start() + except KeyboardInterrupt: + print("\n[信令服务器] 收到退出信号") + server.stop() + + +if __name__ == "__main__": + main()