添加p2p功能

main
杨博文 2 months ago
parent 79597ff6b0
commit 4dc01292ac

@ -132,6 +132,10 @@ class ConnectionManager:
self._lan_socket: Optional[socket.socket] = None
self._lan_listen_port: int = 0
# P2P TCP监听
self._p2p_server: Optional[asyncio.AbstractServer] = None
self._p2p_listen_port: int = 0
logger.info("ConnectionManager initialized")
# ==================== 状态管理 ====================
@ -286,6 +290,9 @@ class ConnectionManager:
# 启动LAN监听
await self._start_lan_listener()
# 启动P2P TCP监听器
await self._start_p2p_server()
logger.info(f"Connected to server {self.config.server_host}:{self.config.server_port}")
return True
@ -367,6 +374,16 @@ class ConnectionManager:
pass
self._lan_socket = None
# 关闭P2P TCP服务器
if self._p2p_server:
try:
self._p2p_server.close()
await self._p2p_server.wait_closed()
except Exception:
pass
self._p2p_server = None
self._p2p_listen_port = 0
self._set_state(ConnectionState.DISCONNECTED, "Disconnected")
logger.info("Disconnected from all connections")
@ -521,6 +538,7 @@ class ConnectionManager:
发送消息到指定对等端
根据连接模式选择通过服务器中转或直接P2P发送
优先使用P2P直连如果没有则尝试建立失败则使用服务器中转
Args:
peer_id: 目标对等端ID
@ -541,7 +559,25 @@ class ConnectionManager:
return True
except Exception as e:
logger.error(f"Failed to send P2P message to {peer_id}: {e}")
# P2P发送失败尝试通过服务器中转
# P2P连接可能已断开移除它
await self._close_p2p_connection(peer_id)
# 尝试建立P2P连接如果对方在局域网内
if peer_id in self._discovered_peers and peer_id not in self._peer_connections:
logger.info(f"Attempting to establish P2P connection with {peer_id}")
if await self.establish_p2p_connection(peer_id):
# P2P连接建立成功重试发送
if peer_id in self._peer_connections:
conn = self._peer_connections[peer_id]
try:
data = self._message_handler.serialize(message)
conn.writer.write(data)
await conn.writer.drain()
conn.last_activity = time.time()
logger.debug(f"Message sent to {peer_id} via newly established P2P")
return True
except Exception as e:
logger.error(f"Failed to send via new P2P connection: {e}")
# 通过服务器中转
if self._server_connected:
@ -574,12 +610,12 @@ class ConnectionManager:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
# 构建发现请求
# 构建发现请求 - 包含P2P TCP端口
discovery_data = {
"magic": self.LAN_DISCOVERY_MAGIC.decode('utf-8'),
"user_id": self._user_id,
"username": self._user_info.username if self._user_info else "",
"port": self._lan_listen_port
"port": self._p2p_listen_port # 使用P2P TCP端口
}
request_data = json.dumps(discovery_data).encode('utf-8')
@ -686,23 +722,47 @@ class ConnectionManager:
if peer_id == self._user_id:
continue
# 发送响应
# 发送响应 - 返回P2P TCP端口
response_data = {
"magic": self.LAN_RESPONSE_MAGIC.decode('utf-8'),
"user_id": self._user_id,
"username": self._user_info.username if self._user_info else "",
"port": self._lan_listen_port
"port": self._p2p_listen_port # 使用P2P TCP端口
}
response = json.dumps(response_data).encode('utf-8')
await loop.sock_sendto(self._lan_socket, response, addr)
logger.debug(f"Responded to LAN discovery from {addr}")
logger.debug(f"Responded to LAN discovery from {addr}, P2P port: {self._p2p_listen_port}")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug(f"LAN listen error: {e}")
continue
async def _start_p2p_server(self) -> None:
"""
启动P2P TCP服务器
监听来自局域网其他客户端的直接连接
"""
try:
# 尝试在随机端口启动TCP服务器
self._p2p_server = await asyncio.start_server(
self._handle_p2p_connection,
'0.0.0.0',
0 # 让系统分配端口
)
# 获取实际监听的端口
addr = self._p2p_server.sockets[0].getsockname()
self._p2p_listen_port = addr[1]
logger.info(f"P2P TCP server started on port {self._p2p_listen_port}")
except Exception as e:
logger.error(f"Failed to start P2P server: {e}")
self._p2p_listen_port = 0
# ==================== 通信模式选择 (需求 1.1, 1.2, 1.3) ====================

@ -210,6 +210,7 @@ class ChatWidget(QWidget):
file_send_requested = pyqtSignal(str) # peer_id
image_send_requested = pyqtSignal(str) # peer_id
voice_call_requested = pyqtSignal(str) # peer_id
voice_call_end_requested = pyqtSignal() # 挂断通话
def __init__(self, parent=None):
super().__init__(parent)
@ -217,6 +218,7 @@ class ChatWidget(QWidget):
self._current_peer_id: Optional[str] = None
self._current_peer_info: Optional[UserInfo] = None
self._messages: List[ChatMessage] = []
self._in_call: bool = False # 是否在通话中
self._setup_ui()
self._connect_signals()
@ -326,6 +328,30 @@ class ChatWidget(QWidget):
self._voice_btn.clicked.connect(self._on_voice_btn_clicked)
button_layout.addWidget(self._voice_btn)
# 挂断按钮(默认隐藏)
self._hangup_btn = QPushButton("📵")
self._hangup_btn.setFixedSize(30, 30)
self._hangup_btn.setToolTip("挂断通话")
self._hangup_btn.setStyleSheet("""
QPushButton {
background-color: #ff4444;
border-radius: 15px;
color: white;
}
QPushButton:hover {
background-color: #ff0000;
}
""")
self._hangup_btn.clicked.connect(self._on_hangup_btn_clicked)
self._hangup_btn.hide() # 默认隐藏
button_layout.addWidget(self._hangup_btn)
# 通话状态标签(默认隐藏)
self._call_status_label = QLabel("")
self._call_status_label.setStyleSheet("color: #4a90d9; font-weight: bold;")
self._call_status_label.hide()
button_layout.addWidget(self._call_status_label)
button_layout.addStretch()
layout.addLayout(button_layout)
@ -406,6 +432,11 @@ class ChatWidget(QWidget):
if self._current_peer_id:
self.voice_call_requested.emit(self._current_peer_id)
def _on_hangup_btn_clicked(self) -> None:
"""处理挂断按钮点击"""
self.voice_call_end_requested.emit()
self.set_call_state(False)
def _scroll_to_bottom(self) -> None:
"""滚动到底部"""
QTimer.singleShot(100, lambda: self._scroll_area.verticalScrollBar().setValue(
@ -534,3 +565,29 @@ class ChatWidget(QWidget):
def current_peer_id(self) -> Optional[str]:
"""获取当前聊天对象ID"""
return self._current_peer_id
def set_call_state(self, in_call: bool, status_text: str = "") -> None:
"""
设置通话状态
Args:
in_call: 是否在通话中
status_text: 状态文本"通话中...""正在呼叫..."
"""
self._in_call = in_call
if in_call:
self._voice_btn.hide()
self._hangup_btn.show()
self._call_status_label.setText(status_text or "通话中...")
self._call_status_label.show()
else:
self._voice_btn.show()
self._hangup_btn.hide()
self._call_status_label.hide()
self._call_status_label.setText("")
@property
def is_in_call(self) -> bool:
"""是否在通话中"""
return self._in_call

@ -27,6 +27,7 @@ class ContactItem(QListWidgetItem):
def __init__(self, user_info: UserInfo, parent: Optional[QListWidget] = None):
super().__init__(parent)
self.user_info = user_info
self._connection_mode: str = "relay" # 默认中转模式
self._update_display()
def _update_display(self) -> None:
@ -38,14 +39,23 @@ class ContactItem(QListWidgetItem):
UserStatus.AWAY: "🟡",
}
# 连接模式图标
mode_icon = "🔗" if self._connection_mode == "p2p" else ""
icon = status_icons.get(self.user_info.status, "")
display_name = self.user_info.display_name or self.user_info.username
self.setText(f"{icon} {display_name}")
if mode_icon:
self.setText(f"{icon} {display_name} {mode_icon}")
else:
self.setText(f"{icon} {display_name}")
# 设置提示信息
mode_text = "P2P直连" if self._connection_mode == "p2p" else "服务器中转"
self.setToolTip(
f"用户名: {self.user_info.username}\n"
f"状态: {self.user_info.status.value}\n"
f"连接模式: {mode_text}\n"
f"ID: {self.user_info.user_id}"
)
@ -53,6 +63,11 @@ class ContactItem(QListWidgetItem):
"""更新用户状态"""
self.user_info.status = status
self._update_display()
def set_connection_mode(self, mode: str) -> None:
"""设置连接模式 ('p2p''relay')"""
self._connection_mode = mode
self._update_display()
class ContactListWidget(QWidget):
@ -68,6 +83,7 @@ class ContactListWidget(QWidget):
contact_selected = pyqtSignal(str) # 选中联系人时发出参数为user_id
contact_double_clicked = pyqtSignal(str) # 双击联系人时发出
refresh_requested = pyqtSignal() # 请求刷新联系人列表
lan_discovery_requested = pyqtSignal() # 请求局域网发现
def __init__(self, parent: Optional[QWidget] = None):
super().__init__(parent)
@ -108,6 +124,12 @@ class ContactListWidget(QWidget):
self._refresh_btn.clicked.connect(self._on_refresh_clicked)
search_layout.addWidget(self._refresh_btn)
self._lan_btn = QPushButton("📡")
self._lan_btn.setFixedSize(30, 30)
self._lan_btn.setToolTip("发现局域网用户")
self._lan_btn.clicked.connect(self._on_lan_discovery_clicked)
search_layout.addWidget(self._lan_btn)
layout.addWidget(search_frame)
# 联系人列表
@ -169,6 +191,10 @@ class ContactListWidget(QWidget):
"""处理刷新按钮点击"""
self.refresh_requested.emit()
def _on_lan_discovery_clicked(self) -> None:
"""处理局域网发现按钮点击"""
self.lan_discovery_requested.emit()
def _filter_contacts(self, text: str) -> None:
"""过滤联系人列表"""
text = text.lower()
@ -329,3 +355,15 @@ class ContactListWidget(QWidget):
"""
if user_id in self._contacts:
self._list_widget.setCurrentItem(self._contacts[user_id])
def update_connection_mode(self, user_id: str, mode: str) -> None:
"""
更新联系人的连接模式
Args:
user_id: 用户ID
mode: 连接模式 ('p2p' 'relay')
"""
if user_id in self._contacts:
self._contacts[user_id].set_connection_mode(mode)
logger.debug(f"Contact {user_id} connection mode updated to {mode}")

@ -293,11 +293,13 @@ class P2PChatGUI(QObject):
self._chat_widget.file_send_requested.connect(lambda peer_id: self._on_send_file())
self._chat_widget.image_send_requested.connect(lambda peer_id: self._on_send_image())
self._chat_widget.voice_call_requested.connect(lambda peer_id: self._on_voice_call())
self._chat_widget.voice_call_end_requested.connect(self._on_end_voice_call)
# 连接联系人列表信号
self._contact_list_widget.contact_selected.connect(self._on_contact_selected)
self._contact_list_widget.contact_double_clicked.connect(self._on_contact_double_clicked)
self._contact_list_widget.refresh_requested.connect(self._refresh_users)
self._contact_list_widget.lan_discovery_requested.connect(self._on_lan_discovery)
# 设置系统托盘
try:
@ -433,6 +435,9 @@ class P2PChatGUI(QObject):
logger.info(f"Voice call accepted by {sender}")
self.main_window._statusbar.showMessage(f"通话已接通 - {sender}", 5000)
self.main_window.show_notification("通话已接通", f"{sender} 的通话已建立")
# 更新UI状态为通话中
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(True, "通话中...")
elif message.msg_type == MessageType.VOICE_CALL_REJECT:
sender = message.sender_id
@ -440,12 +445,18 @@ class P2PChatGUI(QObject):
logger.info(f"Voice call rejected by {sender}: {reason}")
self.main_window._statusbar.showMessage(f"呼叫被拒绝: {reason}", 5000)
self.main_window.show_notification("呼叫被拒绝", f"{sender} 拒绝了你的通话请求")
# 恢复UI状态
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(False)
elif message.msg_type == MessageType.VOICE_CALL_END:
sender = message.sender_id
logger.info(f"Voice call ended by {sender}")
self.main_window._statusbar.showMessage("通话已结束", 3000)
self.main_window.show_notification("通话结束", f"{sender} 的通话已结束")
# 恢复UI状态
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(False)
def _on_user_list_updated(self, users: List[UserInfo]):
"""用户列表更新回调"""
@ -463,6 +474,9 @@ class P2PChatGUI(QObject):
]
self._contact_list_widget.set_contacts(other_users)
logger.info(f"Contact list updated: {len(other_users)} contacts")
# 更新连接模式显示
self._update_connection_modes()
def _on_state_changed(self, state: str, reason: str):
"""状态变化回调"""
@ -480,6 +494,48 @@ class P2PChatGUI(QObject):
if self.worker:
self.worker.refresh_users()
def _on_lan_discovery(self):
"""发现局域网用户"""
self.main_window._statusbar.showMessage("正在发现局域网用户...", 5000)
if self.worker and self.worker._loop and self.worker._running:
future = asyncio.run_coroutine_threadsafe(
self._discover_lan_peers_async(),
self.worker._loop
)
try:
peers = future.result(timeout=5.0)
if peers:
self.main_window._statusbar.showMessage(f"发现 {len(peers)} 个局域网用户", 3000)
self.main_window.show_notification("局域网发现", f"发现 {len(peers)} 个局域网用户")
# 更新联系人列表中的连接模式
self._update_connection_modes()
else:
self.main_window._statusbar.showMessage("未发现局域网用户", 3000)
except Exception as e:
logger.error(f"LAN discovery error: {e}")
self.main_window._statusbar.showMessage("局域网发现失败", 3000)
async def _discover_lan_peers_async(self):
"""异步发现局域网用户"""
if self.client:
return await self.client.discover_lan_peers()
return []
def _update_connection_modes(self):
"""更新联系人列表中的连接模式显示"""
if not self.client or not hasattr(self, '_contact_list_widget'):
return
# 获取所有连接模式
connection_manager = self.client.connection_manager
# 更新每个联系人的连接模式
for user_id in list(self._contact_list_widget._contacts.keys()):
mode = connection_manager.get_connection_mode(user_id)
mode_str = "p2p" if mode.value == "p2p" else "relay"
self._contact_list_widget.update_connection_mode(user_id, mode_str)
def _on_send_file(self):
"""发送文件"""
from PyQt6.QtWidgets import QFileDialog, QMessageBox
@ -564,6 +620,11 @@ class P2PChatGUI(QObject):
return
self.main_window._statusbar.showMessage(f"正在呼叫 {self._current_chat_peer}...")
# 更新UI状态为呼叫中
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(True, "正在呼叫...")
if self.worker:
success = self.worker.start_voice_call(self._current_chat_peer)
if success:
@ -571,6 +632,20 @@ class P2PChatGUI(QObject):
else:
self.main_window._statusbar.showMessage("呼叫失败", 3000)
QMessageBox.warning(self.main_window, "呼叫失败", "无法发起语音通话,请检查网络连接")
# 恢复UI状态
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(False)
def _on_end_voice_call(self):
"""结束语音通话"""
logger.info("Ending voice call from GUI")
if self.worker:
self.worker.end_voice_call()
self.main_window._statusbar.showMessage("通话已结束", 3000)
# 更新UI状态
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(False)
def _show_incoming_call_dialog(self, caller_id: str):
"""显示来电对话框"""
@ -599,6 +674,9 @@ class P2PChatGUI(QObject):
if success:
self.main_window._statusbar.showMessage("通话已接通", 3000)
logger.info(f"Call accepted successfully with {caller_id}")
# 更新UI状态为通话中
if hasattr(self, '_chat_widget') and self._chat_widget:
self._chat_widget.set_call_state(True, "通话中...")
else:
self.main_window._statusbar.showMessage("接听失败", 3000)
logger.error(f"Failed to accept call from {caller_id}")

Loading…
Cancel
Save