You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
信令服务器
负责帮助客户端找到对方的连接信息IP + 端口)
不传输实际文件,只传输元信息
"""
import socket
import threading
import json
import time
from typing import Dict, Optional
class SignalingServer:
"""信令服务器类"""
def __init__(self, host: str = '0.0.0.0', port: int = 9999):
"""
初始化信令服务器
Args:
host: 监听地址,默认 0.0.0.0(监听所有网卡)
port: 监听端口,默认 9999
"""
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
# 在线用户字典:{user_id: {"ip": "x.x.x.x", "port": xxxx, "last_seen": timestamp}}
self.online_users: Dict[str, dict] = {}
self.users_lock = threading.Lock()
def start(self):
"""启动信令服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
# 设置超时,防止 accept() 无限阻塞
self.server_socket.settimeout(1.0)
self.running = True
print(f"[信令服务器] 启动成功,监听 {self.host}:{self.port}")
print(f"[信令服务器] 等待客户端连接...")
# 启动心跳检测线程(可选)
heartbeat_thread = threading.Thread(target=self._heartbeat_checker, daemon=True)
heartbeat_thread.start()
# 主循环:接受客户端连接
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
print(f"[信令服务器] 新连接来自 {client_address}")
# 为每个客户端创建处理线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, client_address),
daemon=True
)
client_thread.start()
except socket.timeout:
# 超时是正常的,继续循环以检查 running 标志
continue
except Exception as e:
if self.running:
print(f"[信令服务器] 接受连接时出错: {e}")
except Exception as e:
print(f"[信令服务器] 启动失败: {e}")
self.running = False
def _handle_client(self, client_socket: socket.socket, client_address: tuple):
"""
处理客户端请求
Args:
client_socket: 客户端套接字
client_address: 客户端地址
"""
try:
# 接收请求数据
data = client_socket.recv(4096).decode('utf-8')
if not data:
return
request = json.loads(data)
request_type = request.get('type')
print(f"[信令服务器] 收到请求: {request_type} 来自 {client_address}")
# 根据请求类型分发处理
if request_type == 'register':
response = self._handle_register(request, client_address)
elif request_type == 'query':
response = self._handle_query(request)
elif request_type == 'unregister':
response = self._handle_unregister(request)
elif request_type == 'list':
response = self._handle_list()
else:
response = {
'status': 'error',
'message': f'未知的请求类型: {request_type}'
}
# 发送响应
client_socket.send(json.dumps(response).encode('utf-8'))
except json.JSONDecodeError:
error_response = {'status': 'error', 'message': '无效的 JSON 格式'}
client_socket.send(json.dumps(error_response).encode('utf-8'))
except Exception as e:
print(f"[信令服务器] 处理客户端请求时出错: {e}")
error_response = {'status': 'error', 'message': str(e)}
try:
client_socket.send(json.dumps(error_response).encode('utf-8'))
except:
pass
finally:
client_socket.close()
def _handle_register(self, request: dict, client_address: tuple) -> dict:
"""
处理用户注册请求
Args:
request: 请求数据 {"type": "register", "user_id": "alice", "listen_port": 8001}
client_address: 客户端地址
Returns:
响应数据
"""
user_id = request.get('user_id')
listen_port = request.get('listen_port')
if not user_id or not listen_port:
return {'status': 'error', 'message': '缺少 user_id 或 listen_port'}
# 获取客户端的真实 IP
client_ip = client_address[0]
with self.users_lock:
# 检查用户是否已经注册
if user_id in self.online_users:
print(f"[信令服务器] 用户 {user_id} 重复注册,更新信息")
# 注册或更新用户信息
self.online_users[user_id] = {
'ip': client_ip,
'port': listen_port,
'last_seen': time.time()
}
print(f"[信令服务器] 用户 {user_id} 注册成功: {client_ip}:{listen_port}")
print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}")
return {
'status': 'success',
'message': '注册成功',
'user_id': user_id,
'ip': client_ip,
'port': listen_port
}
def _handle_query(self, request: dict) -> dict:
"""
处理查询对方信息的请求
Args:
request: 请求数据 {"type": "query", "target_user": "bob"}
Returns:
响应数据
"""
target_user = request.get('target_user')
if not target_user:
return {'status': 'error', 'message': '缺少 target_user'}
with self.users_lock:
if target_user not in self.online_users:
return {
'status': 'error',
'message': f'用户 {target_user} 不在线'
}
user_info = self.online_users[target_user]
print(f"[信令服务器] 查询用户 {target_user}: {user_info['ip']}:{user_info['port']}")
return {
'status': 'success',
'user_id': target_user,
'ip': user_info['ip'],
'port': user_info['port']
}
def _handle_unregister(self, request: dict) -> dict:
"""
处理用户注销请求
Args:
request: 请求数据 {"type": "unregister", "user_id": "alice"}
Returns:
响应数据
"""
user_id = request.get('user_id')
if not user_id:
return {'status': 'error', 'message': '缺少 user_id'}
with self.users_lock:
if user_id in self.online_users:
del self.online_users[user_id]
print(f"[信令服务器] 用户 {user_id} 已注销")
print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}")
return {'status': 'success', 'message': '注销成功'}
else:
return {'status': 'error', 'message': f'用户 {user_id} 不存在'}
def _handle_list(self) -> dict:
"""
处理获取在线用户列表的请求
Returns:
响应数据
"""
with self.users_lock:
users = list(self.online_users.keys())
print(f"[信令服务器] 返回在线用户列表: {users}")
return {
'status': 'success',
'users': users,
'count': len(users)
}
def _heartbeat_checker(self):
"""
心跳检测线程,定期清理超时用户(超过 5 分钟未更新)
"""
timeout = 300 # 5 分钟超时
while self.running:
time.sleep(60) # 每分钟检查一次
current_time = time.time()
with self.users_lock:
timeout_users = [
user_id for user_id, info in self.online_users.items()
if current_time - info['last_seen'] > timeout
]
for user_id in timeout_users:
del self.online_users[user_id]
print(f"[信令服务器] 用户 {user_id} 超时,已移除")
def stop(self):
"""停止信令服务器"""
print("[信令服务器] 正在关闭...")
self.running = False
if self.server_socket:
self.server_socket.close()
print("[信令服务器] 已关闭")
def main():
"""
启动信令服务器
"""
server = SignalingServer(host='0.0.0.0', port=9999)
try:
server.start()
except KeyboardInterrupt:
print("\n[信令服务器] 收到退出信号")
server.stop()
if __name__ == "__main__":
main()