import java.io.*; import java.net.*; import java.util.*; import java.util.concurrent.*; /** * P2P聊天程序 - 基于TCP协议 * 可同时作为服务器和客户端运行 */ public class P2PChat { private static final int DEFAULT_PORT = 8888; private ServerSocket serverSocket; private Map connectedPeers = new ConcurrentHashMap<>(); private ExecutorService threadPool = Executors.newCachedThreadPool(); private volatile boolean running = true; private String username; public P2PChat(String username, int port) { this.username = username; try { serverSocket = new ServerSocket(port); System.out.println("=== P2P聊天程序启动成功 ==="); System.out.println("用户名: " + username); System.out.println("监听端口: " + port); System.out.println("本机IP: " + InetAddress.getLocalHost().getHostAddress()); System.out.println("========================"); } catch (IOException e) { System.err.println("启动失败: " + e.getMessage()); System.exit(1); } } // 启动服务器监听 public void startServer() { threadPool.execute(() -> { while (running) { try { Socket clientSocket = serverSocket.accept(); ClientHandler handler = new ClientHandler(clientSocket); threadPool.execute(handler); } catch (IOException e) { if (running) { System.err.println("接受连接失败: " + e.getMessage()); } } } }); } // 连接到远程对等端 public void connectToPeer(String host, int port) { threadPool.execute(() -> { try { Socket socket = new Socket(host, port); ClientHandler handler = new ClientHandler(socket); // 发送自己的用户名 handler.sendMessage("USERNAME:" + username); threadPool.execute(handler); System.out.println("成功连接到 " + host + ":" + port); } catch (IOException e) { System.err.println("连接失败: " + e.getMessage()); } }); } // 发送消息到所有连接的对等端 public void broadcastMessage(String message) { String fullMessage = "[" + username + "]: " + message; for (ClientHandler handler : connectedPeers.values()) { handler.sendMessage("MESSAGE:" + fullMessage); } } // 发送文件到指定对等端 public void sendFile(String peerId, String filePath) { ClientHandler handler = connectedPeers.get(peerId); if (handler != null) { handler.sendFile(filePath); } else { System.out.println("未找到对等端: " + peerId); } } // 列出所有连接的对等端 public void listPeers() { if (connectedPeers.isEmpty()) { System.out.println("当前没有连接的对等端"); } else { System.out.println("=== 已连接的对等端 ==="); for (String peerId : connectedPeers.keySet()) { System.out.println("- " + peerId); } } } // 关闭程序 public void shutdown() { running = false; try { for (ClientHandler handler : connectedPeers.values()) { handler.close(); } serverSocket.close(); threadPool.shutdown(); System.out.println("程序已关闭"); } catch (IOException e) { System.err.println("关闭失败: " + e.getMessage()); } } // 客户端处理器 class ClientHandler implements Runnable { private Socket socket; private BufferedReader reader; private PrintWriter writer; private String peerId; public ClientHandler(Socket socket) { this.socket = socket; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); writer = new PrintWriter(socket.getOutputStream(), true); } catch (IOException e) { System.err.println("初始化连接失败: " + e.getMessage()); } } @Override public void run() { try { String message; while ((message = reader.readLine()) != null) { handleMessage(message); } } catch (IOException e) { System.out.println("连接断开: " + (peerId != null ? peerId : "未知")); } finally { close(); } } private void handleMessage(String message) { if (message.startsWith("USERNAME:")) { peerId = message.substring(9); connectedPeers.put(peerId, this); System.out.println(peerId + " 已连接"); sendMessage("USERNAME:" + username); } else if (message.startsWith("MESSAGE:")) { System.out.println(message.substring(8)); } else if (message.startsWith("FILE:")) { handleFileTransfer(message.substring(5)); } } public void sendMessage(String message) { writer.println(message); } public void sendFile(String filePath) { try { File file = new File(filePath); if (!file.exists()) { System.out.println("文件不存在: " + filePath); return; } // 发送文件信息 sendMessage("FILE:" + file.getName() + ":" + file.length()); // 发送文件内容 FileInputStream fis = new FileInputStream(file); OutputStream os = socket.getOutputStream(); byte[] buffer = new byte[4096]; int bytesRead; while ((bytesRead = fis.read(buffer)) != -1) { os.write(buffer, 0, bytesRead); } os.flush(); fis.close(); System.out.println("文件发送成功: " + file.getName()); } catch (IOException e) { System.err.println("文件发送失败: " + e.getMessage()); } } private void handleFileTransfer(String fileInfo) { try { String[] parts = fileInfo.split(":"); String fileName = parts[0]; long fileSize = Long.parseLong(parts[1]); // 接收文件 File file = new File("received_" + fileName); FileOutputStream fos = new FileOutputStream(file); InputStream is = socket.getInputStream(); byte[] buffer = new byte[4096]; long totalRead = 0; int bytesRead; while (totalRead < fileSize && (bytesRead = is.read(buffer)) != -1) { fos.write(buffer, 0, bytesRead); totalRead += bytesRead; } fos.close(); System.out.println("文件接收成功: " + file.getName()); } catch (IOException e) { System.err.println("文件接收失败: " + e.getMessage()); } } public void close() { try { if (peerId != null) { connectedPeers.remove(peerId); } socket.close(); } catch (IOException e) { // 忽略 } } } public static void main(String[] args) { Scanner scanner = new Scanner(System.in); System.out.print("请输入用户名: "); String username = scanner.nextLine(); System.out.print("请输入监听端口 (默认8888): "); String portInput = scanner.nextLine(); int port = portInput.isEmpty() ? DEFAULT_PORT : Integer.parseInt(portInput); P2PChat chat = new P2PChat(username, port); chat.startServer(); // 命令行界面 System.out.println("\n可用命令:"); System.out.println(" connect <端口> - 连接到对等端"); System.out.println(" send <消息> - 发送消息"); System.out.println(" file <对等端> <文件路径> - 发送文件"); System.out.println(" list - 列出连接的对等端"); System.out.println(" quit - 退出程序\n"); while (true) { System.out.print("> "); String input = scanner.nextLine().trim(); if (input.isEmpty()) continue; String[] parts = input.split("\\s+", 3); String command = parts[0].toLowerCase(); switch (command) { case "connect": if (parts.length >= 3) { chat.connectToPeer(parts[1], Integer.parseInt(parts[2])); } else { System.out.println("用法: connect <端口>"); } break; case "send": if (parts.length >= 2) { chat.broadcastMessage(input.substring(5)); } else { System.out.println("用法: send <消息>"); } break; case "file": if (parts.length >= 3) { chat.sendFile(parts[1], parts[2]); } else { System.out.println("用法: file <对等端> <文件路径>"); } break; case "list": chat.listPeers(); break; case "quit": chat.shutdown(); scanner.close(); System.exit(0); break; default: System.out.println("未知命令: " + command); } } } }