#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ P2P Chat Server 启动脚本 用法: python run_server.py [--host HOST] [--port PORT] 示例: python run_server.py python run_server.py --host 0.0.0.0 --port 8888 """ import asyncio import argparse import logging import signal import sys from typing import Optional from server.relay_server import RelayServer from config import load_server_config, ServerConfig # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('server.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) class ServerRunner: """服务器运行器""" def __init__(self, config: ServerConfig): self.config = config self.server: Optional[RelayServer] = None self._shutdown_event = asyncio.Event() async def start(self) -> None: """启动服务器""" self.server = RelayServer(self.config) # 设置信号处理 if sys.platform != 'win32': loop = asyncio.get_event_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, self._signal_handler) logger.info("=" * 50) logger.info("P2P Chat Server 启动中...") logger.info(f"监听地址: {self.config.host}:{self.config.port}") logger.info(f"最大连接数: {self.config.max_connections}") logger.info("=" * 50) try: await self.server.start() except KeyboardInterrupt: logger.info("收到中断信号") except Exception as e: logger.error(f"服务器错误: {e}") finally: await self.stop() async def stop(self) -> None: """停止服务器""" if self.server: logger.info("正在关闭服务器...") await self.server.stop() logger.info("服务器已关闭") def _signal_handler(self) -> None: """信号处理""" logger.info("收到关闭信号") self._shutdown_event.set() def parse_args() -> argparse.Namespace: """解析命令行参数""" parser = argparse.ArgumentParser( description='P2P Chat Server', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--host', type=str, default=None, help='服务器监听地址 (默认: 0.0.0.0)' ) parser.add_argument( '--port', type=int, default=None, help='服务器监听端口 (默认: 8888)' ) parser.add_argument( '--max-connections', type=int, default=None, help='最大连接数 (默认: 1000)' ) parser.add_argument( '--debug', action='store_true', help='启用调试模式' ) return parser.parse_args() def main() -> None: """主函数""" args = parse_args() # 设置日志级别 if args.debug: logging.getLogger().setLevel(logging.DEBUG) # 加载配置 config = load_server_config() # 命令行参数覆盖配置 if args.host: config.host = args.host if args.port: config.port = args.port if args.max_connections: config.max_connections = args.max_connections # 运行服务器 runner = ServerRunner(config) try: asyncio.run(runner.start()) except KeyboardInterrupt: logger.info("服务器被用户中断") except Exception as e: logger.error(f"启动失败: {e}") sys.exit(1) if __name__ == "__main__": main()