You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
10 KiB

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
/**
* P2P聊天程序 - 基于TCP协议
* 可同时作为服务器和客户端运行
*/
public class P2PChat {
private static final int DEFAULT_PORT = 8888;
private ServerSocket serverSocket;
private Map<String, ClientHandler> connectedPeers = new ConcurrentHashMap<>();
private ExecutorService threadPool = Executors.newCachedThreadPool();
private volatile boolean running = true;
private String username;
public P2PChat(String username, int port) {
this.username = username;
try {
serverSocket = new ServerSocket(port);
System.out.println("=== P2P聊天程序启动成功 ===");
System.out.println("用户名: " + username);
System.out.println("监听端口: " + port);
System.out.println("本机IP: " + InetAddress.getLocalHost().getHostAddress());
System.out.println("========================");
} catch (IOException e) {
System.err.println("启动失败: " + e.getMessage());
System.exit(1);
}
}
// 启动服务器监听
public void startServer() {
threadPool.execute(() -> {
while (running) {
try {
Socket clientSocket = serverSocket.accept();
ClientHandler handler = new ClientHandler(clientSocket);
threadPool.execute(handler);
} catch (IOException e) {
if (running) {
System.err.println("接受连接失败: " + e.getMessage());
}
}
}
});
}
// 连接到远程对等端
public void connectToPeer(String host, int port) {
threadPool.execute(() -> {
try {
Socket socket = new Socket(host, port);
ClientHandler handler = new ClientHandler(socket);
// 发送自己的用户名
handler.sendMessage("USERNAME:" + username);
threadPool.execute(handler);
System.out.println("成功连接到 " + host + ":" + port);
} catch (IOException e) {
System.err.println("连接失败: " + e.getMessage());
}
});
}
// 发送消息到所有连接的对等端
public void broadcastMessage(String message) {
String fullMessage = "[" + username + "]: " + message;
for (ClientHandler handler : connectedPeers.values()) {
handler.sendMessage("MESSAGE:" + fullMessage);
}
}
// 发送文件到指定对等端
public void sendFile(String peerId, String filePath) {
ClientHandler handler = connectedPeers.get(peerId);
if (handler != null) {
handler.sendFile(filePath);
} else {
System.out.println("未找到对等端: " + peerId);
}
}
// 列出所有连接的对等端
public void listPeers() {
if (connectedPeers.isEmpty()) {
System.out.println("当前没有连接的对等端");
} else {
System.out.println("=== 已连接的对等端 ===");
for (String peerId : connectedPeers.keySet()) {
System.out.println("- " + peerId);
}
}
}
// 关闭程序
public void shutdown() {
running = false;
try {
for (ClientHandler handler : connectedPeers.values()) {
handler.close();
}
serverSocket.close();
threadPool.shutdown();
System.out.println("程序已关闭");
} catch (IOException e) {
System.err.println("关闭失败: " + e.getMessage());
}
}
// 客户端处理器
class ClientHandler implements Runnable {
private Socket socket;
private BufferedReader reader;
private PrintWriter writer;
private String peerId;
public ClientHandler(Socket socket) {
this.socket = socket;
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
writer = new PrintWriter(socket.getOutputStream(), true);
} catch (IOException e) {
System.err.println("初始化连接失败: " + e.getMessage());
}
}
@Override
public void run() {
try {
String message;
while ((message = reader.readLine()) != null) {
handleMessage(message);
}
} catch (IOException e) {
System.out.println("连接断开: " + (peerId != null ? peerId : "未知"));
} finally {
close();
}
}
private void handleMessage(String message) {
if (message.startsWith("USERNAME:")) {
peerId = message.substring(9);
connectedPeers.put(peerId, this);
System.out.println(peerId + " 已连接");
sendMessage("USERNAME:" + username);
} else if (message.startsWith("MESSAGE:")) {
System.out.println(message.substring(8));
} else if (message.startsWith("FILE:")) {
handleFileTransfer(message.substring(5));
}
}
public void sendMessage(String message) {
writer.println(message);
}
public void sendFile(String filePath) {
try {
File file = new File(filePath);
if (!file.exists()) {
System.out.println("文件不存在: " + filePath);
return;
}
// 发送文件信息
sendMessage("FILE:" + file.getName() + ":" + file.length());
// 发送文件内容
FileInputStream fis = new FileInputStream(file);
OutputStream os = socket.getOutputStream();
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
os.write(buffer, 0, bytesRead);
}
os.flush();
fis.close();
System.out.println("文件发送成功: " + file.getName());
} catch (IOException e) {
System.err.println("文件发送失败: " + e.getMessage());
}
}
private void handleFileTransfer(String fileInfo) {
try {
String[] parts = fileInfo.split(":");
String fileName = parts[0];
long fileSize = Long.parseLong(parts[1]);
// 接收文件
File file = new File("received_" + fileName);
FileOutputStream fos = new FileOutputStream(file);
InputStream is = socket.getInputStream();
byte[] buffer = new byte[4096];
long totalRead = 0;
int bytesRead;
while (totalRead < fileSize && (bytesRead = is.read(buffer)) != -1) {
fos.write(buffer, 0, bytesRead);
totalRead += bytesRead;
}
fos.close();
System.out.println("文件接收成功: " + file.getName());
} catch (IOException e) {
System.err.println("文件接收失败: " + e.getMessage());
}
}
public void close() {
try {
if (peerId != null) {
connectedPeers.remove(peerId);
}
socket.close();
} catch (IOException e) {
// 忽略
}
}
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
System.out.print("请输入用户名: ");
String username = scanner.nextLine();
System.out.print("请输入监听端口 (默认8888): ");
String portInput = scanner.nextLine();
int port = portInput.isEmpty() ? DEFAULT_PORT : Integer.parseInt(portInput);
P2PChat chat = new P2PChat(username, port);
chat.startServer();
// 命令行界面
System.out.println("\n可用命令:");
System.out.println(" connect <IP> <端口> - 连接到对等端");
System.out.println(" send <消息> - 发送消息");
System.out.println(" file <对等端> <文件路径> - 发送文件");
System.out.println(" list - 列出连接的对等端");
System.out.println(" quit - 退出程序\n");
while (true) {
System.out.print("> ");
String input = scanner.nextLine().trim();
if (input.isEmpty()) continue;
String[] parts = input.split("\\s+", 3);
String command = parts[0].toLowerCase();
switch (command) {
case "connect":
if (parts.length >= 3) {
chat.connectToPeer(parts[1], Integer.parseInt(parts[2]));
} else {
System.out.println("用法: connect <IP> <端口>");
}
break;
case "send":
if (parts.length >= 2) {
chat.broadcastMessage(input.substring(5));
} else {
System.out.println("用法: send <消息>");
}
break;
case "file":
if (parts.length >= 3) {
chat.sendFile(parts[1], parts[2]);
} else {
System.out.println("用法: file <对等端> <文件路径>");
}
break;
case "list":
chat.listPeers();
break;
case "quit":
chat.shutdown();
scanner.close();
System.exit(0);
break;
default:
System.out.println("未知命令: " + command);
}
}
}
}