ADD file via upload

main
p9x5lkiwf 2 weeks ago
parent d0151499a5
commit 28da29f021

@ -0,0 +1,289 @@
"""
信令服务器
负责帮助客户端找到对方的连接信息IP + 端口
不传输实际文件只传输元信息
"""
import socket
import threading
import json
import time
from typing import Dict, Optional
class SignalingServer:
"""信令服务器类"""
def __init__(self, host: str = '0.0.0.0', port: int = 9999):
"""
初始化信令服务器
Args:
host: 监听地址默认 0.0.0.0监听所有网卡
port: 监听端口默认 9999
"""
self.host = host
self.port = port
self.server_socket: Optional[socket.socket] = None
self.running = False
# 在线用户字典:{user_id: {"ip": "x.x.x.x", "port": xxxx, "last_seen": timestamp}}
self.online_users: Dict[str, dict] = {}
self.users_lock = threading.Lock()
def start(self):
"""启动信令服务器"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
# 设置超时,防止 accept() 无限阻塞
self.server_socket.settimeout(1.0)
self.running = True
print(f"[信令服务器] 启动成功,监听 {self.host}:{self.port}")
print(f"[信令服务器] 等待客户端连接...")
# 启动心跳检测线程(可选)
heartbeat_thread = threading.Thread(target=self._heartbeat_checker, daemon=True)
heartbeat_thread.start()
# 主循环:接受客户端连接
while self.running:
try:
client_socket, client_address = self.server_socket.accept()
print(f"[信令服务器] 新连接来自 {client_address}")
# 为每个客户端创建处理线程
client_thread = threading.Thread(
target=self._handle_client,
args=(client_socket, client_address),
daemon=True
)
client_thread.start()
except socket.timeout:
# 超时是正常的,继续循环以检查 running 标志
continue
except Exception as e:
if self.running:
print(f"[信令服务器] 接受连接时出错: {e}")
except Exception as e:
print(f"[信令服务器] 启动失败: {e}")
self.running = False
def _handle_client(self, client_socket: socket.socket, client_address: tuple):
"""
处理客户端请求
Args:
client_socket: 客户端套接字
client_address: 客户端地址
"""
try:
# 接收请求数据
data = client_socket.recv(4096).decode('utf-8')
if not data:
return
request = json.loads(data)
request_type = request.get('type')
print(f"[信令服务器] 收到请求: {request_type} 来自 {client_address}")
# 根据请求类型分发处理
if request_type == 'register':
response = self._handle_register(request, client_address)
elif request_type == 'query':
response = self._handle_query(request)
elif request_type == 'unregister':
response = self._handle_unregister(request)
elif request_type == 'list':
response = self._handle_list()
else:
response = {
'status': 'error',
'message': f'未知的请求类型: {request_type}'
}
# 发送响应
client_socket.send(json.dumps(response).encode('utf-8'))
except json.JSONDecodeError:
error_response = {'status': 'error', 'message': '无效的 JSON 格式'}
client_socket.send(json.dumps(error_response).encode('utf-8'))
except Exception as e:
print(f"[信令服务器] 处理客户端请求时出错: {e}")
error_response = {'status': 'error', 'message': str(e)}
try:
client_socket.send(json.dumps(error_response).encode('utf-8'))
except:
pass
finally:
client_socket.close()
def _handle_register(self, request: dict, client_address: tuple) -> dict:
"""
处理用户注册请求
Args:
request: 请求数据 {"type": "register", "user_id": "alice", "listen_port": 8001}
client_address: 客户端地址
Returns:
响应数据
"""
user_id = request.get('user_id')
listen_port = request.get('listen_port')
if not user_id or not listen_port:
return {'status': 'error', 'message': '缺少 user_id 或 listen_port'}
# 获取客户端的真实 IP
client_ip = client_address[0]
with self.users_lock:
# 检查用户是否已经注册
if user_id in self.online_users:
print(f"[信令服务器] 用户 {user_id} 重复注册,更新信息")
# 注册或更新用户信息
self.online_users[user_id] = {
'ip': client_ip,
'port': listen_port,
'last_seen': time.time()
}
print(f"[信令服务器] 用户 {user_id} 注册成功: {client_ip}:{listen_port}")
print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}")
return {
'status': 'success',
'message': '注册成功',
'user_id': user_id,
'ip': client_ip,
'port': listen_port
}
def _handle_query(self, request: dict) -> dict:
"""
处理查询对方信息的请求
Args:
request: 请求数据 {"type": "query", "target_user": "bob"}
Returns:
响应数据
"""
target_user = request.get('target_user')
if not target_user:
return {'status': 'error', 'message': '缺少 target_user'}
with self.users_lock:
if target_user not in self.online_users:
return {
'status': 'error',
'message': f'用户 {target_user} 不在线'
}
user_info = self.online_users[target_user]
print(f"[信令服务器] 查询用户 {target_user}: {user_info['ip']}:{user_info['port']}")
return {
'status': 'success',
'user_id': target_user,
'ip': user_info['ip'],
'port': user_info['port']
}
def _handle_unregister(self, request: dict) -> dict:
"""
处理用户注销请求
Args:
request: 请求数据 {"type": "unregister", "user_id": "alice"}
Returns:
响应数据
"""
user_id = request.get('user_id')
if not user_id:
return {'status': 'error', 'message': '缺少 user_id'}
with self.users_lock:
if user_id in self.online_users:
del self.online_users[user_id]
print(f"[信令服务器] 用户 {user_id} 已注销")
print(f"[信令服务器] 当前在线用户: {list(self.online_users.keys())}")
return {'status': 'success', 'message': '注销成功'}
else:
return {'status': 'error', 'message': f'用户 {user_id} 不存在'}
def _handle_list(self) -> dict:
"""
处理获取在线用户列表的请求
Returns:
响应数据
"""
with self.users_lock:
users = list(self.online_users.keys())
print(f"[信令服务器] 返回在线用户列表: {users}")
return {
'status': 'success',
'users': users,
'count': len(users)
}
def _heartbeat_checker(self):
"""
心跳检测线程定期清理超时用户超过 5 分钟未更新
"""
timeout = 300 # 5 分钟超时
while self.running:
time.sleep(60) # 每分钟检查一次
current_time = time.time()
with self.users_lock:
timeout_users = [
user_id for user_id, info in self.online_users.items()
if current_time - info['last_seen'] > timeout
]
for user_id in timeout_users:
del self.online_users[user_id]
print(f"[信令服务器] 用户 {user_id} 超时,已移除")
def stop(self):
"""停止信令服务器"""
print("[信令服务器] 正在关闭...")
self.running = False
if self.server_socket:
self.server_socket.close()
print("[信令服务器] 已关闭")
def main():
"""
启动信令服务器
"""
server = SignalingServer(host='0.0.0.0', port=9999)
try:
server.start()
except KeyboardInterrupt:
print("\n[信令服务器] 收到退出信号")
server.stop()
if __name__ == "__main__":
main()
Loading…
Cancel
Save