You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

312 lines
10 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
P2P Chat Client 启动脚本
用法:
python run_client.py --username USERNAME [--server HOST] [--port PORT]
示例:
python run_client.py --username alice
python run_client.py --username bob --server 192.168.1.100 --port 8888
"""
import asyncio
import argparse
import logging
import sys
import uuid
from typing import Optional, List
from client.app import P2PClientApp
from client.connection_manager import ConnectionState
from shared.models import UserInfo, UserStatus, Message, MessageType
from config import load_client_config, ClientConfig
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class ClientRunner:
"""客户端运行器"""
def __init__(self, config: ClientConfig, username: str, display_name: str = ""):
self.config = config
self.username = username
self.display_name = display_name or username
self.client: Optional[P2PClientApp] = None
self._running = False
async def start(self) -> bool:
"""启动客户端"""
self.client = P2PClientApp(self.config)
# 创建用户信息
user_info = UserInfo(
user_id=str(uuid.uuid4()),
username=self.username,
display_name=self.display_name,
status=UserStatus.ONLINE
)
# 注册回调
self.client.add_message_callback(self._on_message)
self.client.add_state_callback(self._on_state_change)
self.client.add_user_list_callback(self._on_user_list_update)
logger.info("=" * 50)
logger.info("P2P Chat Client 启动中...")
logger.info(f"用户名: {self.username}")
logger.info(f"服务器: {self.config.server_host}:{self.config.server_port}")
logger.info("=" * 50)
# 连接服务器
success = await self.client.start(user_info)
if success:
self._running = True
logger.info("连接成功!")
print("\n" + "=" * 50)
print("欢迎使用 P2P Chat!")
print("命令列表:")
print(" /list - 查看在线用户")
print(" /msg <用户> <消息> - 发送消息")
print(" /file <用户> <路径> - 发送文件")
print(" /call <用户> - 发起语音通话")
print(" /endcall - 结束通话")
print(" /stats - 查看统计信息")
print(" /quit - 退出")
print("=" * 50 + "\n")
return True
else:
logger.error("连接失败")
return False
async def run_interactive(self) -> None:
"""运行交互式命令行"""
while self._running:
try:
# 异步读取输入
line = await asyncio.get_event_loop().run_in_executor(
None, input, f"[{self.username}] > "
)
if not line.strip():
continue
await self._process_command(line.strip())
except EOFError:
break
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"命令处理错误: {e}")
await self.stop()
async def _process_command(self, line: str) -> None:
"""处理命令"""
parts = line.split(maxsplit=2)
cmd = parts[0].lower()
if cmd == "/quit" or cmd == "/exit":
self._running = False
elif cmd == "/list":
users = self.client.get_online_users()
print(f"\n在线用户 ({len(users)}):")
for user in users:
print(f" - {user.display_name} ({user.username})")
print()
elif cmd == "/msg":
if len(parts) < 3:
print("用法: /msg <用户名> <消息>")
return
target = parts[1]
message = parts[2]
success = await self.client.send_text_message(target, message)
if success:
print(f"[发送给 {target}]: {message}")
else:
print(f"发送失败")
elif cmd == "/file":
if len(parts) < 3:
print("用法: /file <用户名> <文件路径>")
return
target = parts[1]
file_path = parts[2]
print(f"正在发送文件: {file_path}")
success = await self.client.send_file(
target, file_path,
progress_callback=lambda p: print(f"\r进度: {p.percentage:.1f}%", end="")
)
print()
if success:
print("文件发送完成")
else:
print("文件发送失败")
elif cmd == "/call":
if len(parts) < 2:
print("用法: /call <用户名>")
return
target = parts[1]
success = await self.client.start_voice_call(target)
if success:
print(f"正在呼叫 {target}...")
else:
print("呼叫失败")
elif cmd == "/endcall":
self.client.end_voice_call()
print("通话已结束")
elif cmd == "/stats":
stats = self.client.get_stats()
print("\n统计信息:")
print(f" 用户: {stats['user']}")
print(f" 在线用户数: {stats['online_users']}")
print(f" 聊天会话数: {stats['chat_sessions']}")
print(f" 消息总数: {stats['total_messages']}")
print(f" 活跃传输: {stats['active_transfers']}")
print(f" 语音通话: {'' if stats['voice_call_active'] else ''}")
print()
elif cmd == "/refresh":
await self.client.refresh_online_users()
print("用户列表已刷新")
elif cmd == "/help":
print("\n命令列表:")
print(" /list - 查看在线用户")
print(" /msg <用户> <消息> - 发送消息")
print(" /file <用户> <路径> - 发送文件")
print(" /call <用户> - 发起语音通话")
print(" /endcall - 结束通话")
print(" /stats - 查看统计信息")
print(" /refresh - 刷新用户列表")
print(" /quit - 退出")
print()
else:
print(f"未知命令: {cmd},输入 /help 查看帮助")
async def stop(self) -> None:
"""停止客户端"""
self._running = False
if self.client:
await self.client.stop()
logger.info("客户端已关闭")
def _on_message(self, message: Message) -> None:
"""消息回调"""
if message.msg_type == MessageType.TEXT:
content = message.payload.decode('utf-8')
print(f"\n[来自 {message.sender_id}]: {content}")
print(f"[{self.username}] > ", end="", flush=True)
elif message.msg_type == MessageType.VOICE_CALL_REQUEST:
print(f"\n来电: {message.sender_id} 正在呼叫你...")
print("输入 /accept 接听 或 /reject 拒绝")
print(f"[{self.username}] > ", end="", flush=True)
def _on_state_change(self, state: ConnectionState, reason: Optional[str]) -> None:
"""状态变化回调"""
if state == ConnectionState.DISCONNECTED:
print(f"\n连接断开: {reason or '未知原因'}")
self._running = False
elif state == ConnectionState.RECONNECTING:
print(f"\n正在重连...")
def _on_user_list_update(self, users: List[UserInfo]) -> None:
"""用户列表更新回调"""
logger.debug(f"用户列表更新: {len(users)} 用户在线")
def parse_args() -> argparse.Namespace:
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='P2P Chat Client',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--username', '-u',
type=str,
required=True,
help='用户名 (必填)'
)
parser.add_argument(
'--display-name', '-d',
type=str,
default="",
help='显示名称 (默认与用户名相同)'
)
parser.add_argument(
'--server', '-s',
type=str,
default=None,
help='服务器地址 (默认: 127.0.0.1)'
)
parser.add_argument(
'--port', '-p',
type=int,
default=None,
help='服务器端口 (默认: 8888)'
)
parser.add_argument(
'--debug',
action='store_true',
help='启用调试模式'
)
return parser.parse_args()
async def main_async(args: argparse.Namespace) -> None:
"""异步主函数"""
# 加载配置
config = load_client_config()
# 命令行参数覆盖配置
if args.server:
config.server_host = args.server
if args.port:
config.server_port = args.port
# 创建并运行客户端
runner = ClientRunner(config, args.username, args.display_name)
success = await runner.start()
if success:
await runner.run_interactive()
def main() -> None:
"""主函数"""
args = parse_args()
# 设置日志级别
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
try:
asyncio.run(main_async(args))
except KeyboardInterrupt:
logger.info("客户端被用户中断")
except Exception as e:
logger.error(f"启动失败: {e}")
sys.exit(1)
if __name__ == "__main__":
main()