You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

679 lines
22 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# P2P Network Communication - File Transfer Widget
"""
文件传输界面组件
实现文件选择、传输进度显示和图片预览
需求: 4.1, 4.3, 4.6, 5.2, 5.3, 5.4, 5.6
"""
from __future__ import annotations
import logging
import os
from typing import Optional, Dict, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Type
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QLabel, QPushButton,
QProgressBar, QFileDialog, QScrollArea, QFrame, QMessageBox,
QDialog, QSlider, QSizePolicy
)
from PyQt6.QtCore import Qt, pyqtSignal, QSize
from PyQt6.QtGui import QPixmap, QTransform
from shared.models import TransferProgress, TransferStatus
logger = logging.getLogger(__name__)
class TransferItem(QFrame):
"""传输项组件"""
cancel_requested = pyqtSignal(str) # file_id
def __init__(self, file_id: str, file_name: str, total_size: int,
is_upload: bool = True, parent=None):
super().__init__(parent)
self.file_id = file_id
self.file_name = file_name
self.total_size = total_size
self.is_upload = is_upload
self._setup_ui()
def _setup_ui(self) -> None:
"""设置UI"""
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setStyleSheet("""
QFrame {
background-color: white;
border: 1px solid #ddd;
border-radius: 5px;
margin: 2px;
}
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(10, 10, 10, 10)
layout.setSpacing(5)
# 文件信息行
info_layout = QHBoxLayout()
icon = "⬆️" if self.is_upload else "⬇️"
self._name_label = QLabel(f"{icon} {self.file_name}")
self._name_label.setStyleSheet("font-weight: bold;")
info_layout.addWidget(self._name_label, 1)
self._size_label = QLabel(self._format_size(self.total_size))
self._size_label.setStyleSheet("color: #666;")
info_layout.addWidget(self._size_label)
self._cancel_btn = QPushButton("")
self._cancel_btn.setFixedSize(20, 20)
self._cancel_btn.setStyleSheet("""
QPushButton {
border: none;
color: #999;
}
QPushButton:hover {
color: red;
}
""")
self._cancel_btn.clicked.connect(lambda: self.cancel_requested.emit(self.file_id))
info_layout.addWidget(self._cancel_btn)
layout.addLayout(info_layout)
# 进度条
self._progress_bar = QProgressBar()
self._progress_bar.setRange(0, 100)
self._progress_bar.setValue(0)
self._progress_bar.setTextVisible(True)
self._progress_bar.setStyleSheet("""
QProgressBar {
border: 1px solid #ddd;
border-radius: 3px;
text-align: center;
height: 20px;
}
QProgressBar::chunk {
background-color: #4a90d9;
border-radius: 2px;
}
""")
layout.addWidget(self._progress_bar)
# 状态行
status_layout = QHBoxLayout()
self._status_label = QLabel("等待中...")
self._status_label.setStyleSheet("color: #666; font-size: 12px;")
status_layout.addWidget(self._status_label)
status_layout.addStretch()
self._speed_label = QLabel("")
self._speed_label.setStyleSheet("color: #666; font-size: 12px;")
status_layout.addWidget(self._speed_label)
layout.addLayout(status_layout)
def _format_size(self, size: int) -> str:
"""格式化文件大小"""
if size < 1024:
return f"{size} B"
elif size < 1024 * 1024:
return f"{size / 1024:.1f} KB"
elif size < 1024 * 1024 * 1024:
return f"{size / (1024 * 1024):.1f} MB"
else:
return f"{size / (1024 * 1024 * 1024):.2f} GB"
def _format_speed(self, speed: float) -> str:
"""格式化传输速度"""
return f"{self._format_size(int(speed))}/s"
def update_progress(self, progress: TransferProgress) -> None:
"""更新进度"""
percent = int(progress.progress_percent)
self._progress_bar.setValue(percent)
self._speed_label.setText(self._format_speed(progress.speed))
if progress.eta > 0:
eta_str = f"剩余 {int(progress.eta)}"
else:
eta_str = ""
self._status_label.setText(f"传输中... {eta_str}")
def set_status(self, status: TransferStatus) -> None:
"""设置状态"""
status_texts = {
TransferStatus.PENDING: "等待中...",
TransferStatus.IN_PROGRESS: "传输中...",
TransferStatus.COMPLETED: "✓ 完成",
TransferStatus.FAILED: "✕ 失败",
TransferStatus.CANCELLED: "已取消",
TransferStatus.PAUSED: "已暂停",
}
self._status_label.setText(status_texts.get(status, "未知"))
if status == TransferStatus.COMPLETED:
self._progress_bar.setValue(100)
self._cancel_btn.hide()
self._progress_bar.setStyleSheet("""
QProgressBar {
border: 1px solid #ddd;
border-radius: 3px;
text-align: center;
height: 20px;
}
QProgressBar::chunk {
background-color: #4caf50;
border-radius: 2px;
}
""")
elif status == TransferStatus.FAILED:
self._cancel_btn.hide()
self._progress_bar.setStyleSheet("""
QProgressBar {
border: 1px solid #ddd;
border-radius: 3px;
text-align: center;
height: 20px;
}
QProgressBar::chunk {
background-color: #f44336;
border-radius: 2px;
}
""")
class FileTransferWidget(QWidget):
"""
文件传输界面组件
实现文件选择对话框 (需求 4.1)
实现传输进度显示 (需求 4.3)
实现图片预览和查看 (需求 5.2, 5.3, 5.4, 5.6)
"""
# 信号定义
send_file_requested = pyqtSignal(str, str) # peer_id, file_path
cancel_transfer_requested = pyqtSignal(str) # file_id
def __init__(self, parent=None):
super().__init__(parent)
self._transfers: Dict[str, TransferItem] = {}
self._setup_ui()
logger.info("FileTransferWidget initialized")
def _setup_ui(self) -> None:
"""设置UI"""
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# 标题栏
header = QFrame()
header.setFixedHeight(40)
header.setStyleSheet("""
QFrame {
background-color: #f5f5f5;
border-bottom: 1px solid #ddd;
}
""")
header_layout = QHBoxLayout(header)
header_layout.setContentsMargins(10, 0, 10, 0)
title_label = QLabel("文件传输")
title_label.setStyleSheet("font-weight: bold;")
header_layout.addWidget(title_label)
header_layout.addStretch()
self._clear_btn = QPushButton("清除已完成")
self._clear_btn.clicked.connect(self._clear_completed)
header_layout.addWidget(self._clear_btn)
layout.addWidget(header)
# 传输列表
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
self._transfer_container = QWidget()
self._transfer_layout = QVBoxLayout(self._transfer_container)
self._transfer_layout.setContentsMargins(5, 5, 5, 5)
self._transfer_layout.setSpacing(5)
self._transfer_layout.addStretch()
scroll_area.setWidget(self._transfer_container)
layout.addWidget(scroll_area, 1)
# 空状态提示
self._empty_label = QLabel("暂无传输任务")
self._empty_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self._empty_label.setStyleSheet("color: #999;")
self._transfer_layout.insertWidget(0, self._empty_label)
def _clear_completed(self) -> None:
"""清除已完成的传输"""
to_remove = []
for file_id, item in self._transfers.items():
# 检查是否已完成(简化判断)
if item._progress_bar.value() == 100:
to_remove.append(file_id)
for file_id in to_remove:
self.remove_transfer(file_id)
def _update_empty_state(self) -> None:
"""更新空状态显示"""
self._empty_label.setVisible(len(self._transfers) == 0)
# ==================== 公共方法 ====================
def add_transfer(self, file_id: str, file_name: str, total_size: int,
is_upload: bool = True) -> None:
"""
添加传输任务
Args:
file_id: 文件ID
file_name: 文件名
total_size: 文件大小
is_upload: 是否是上传
"""
if file_id in self._transfers:
return
item = TransferItem(file_id, file_name, total_size, is_upload)
item.cancel_requested.connect(self._on_cancel_requested)
self._transfer_layout.insertWidget(self._transfer_layout.count() - 1, item)
self._transfers[file_id] = item
self._update_empty_state()
logger.debug(f"Transfer added: {file_name}")
def update_transfer_progress(self, file_id: str, progress: TransferProgress) -> None:
"""
更新传输进度
实现传输进度显示 (需求 4.3)
WHILE 文件传输进行中 THEN P2P_Client SHALL 显示传输进度百分比和传输速度
Args:
file_id: 文件ID
progress: 进度信息
"""
if file_id in self._transfers:
self._transfers[file_id].update_progress(progress)
def update_transfer_status(self, file_id: str, status: TransferStatus) -> None:
"""
更新传输状态
Args:
file_id: 文件ID
status: 状态
"""
if file_id in self._transfers:
self._transfers[file_id].set_status(status)
def remove_transfer(self, file_id: str) -> None:
"""
移除传输任务
Args:
file_id: 文件ID
"""
if file_id not in self._transfers:
return
item = self._transfers.pop(file_id)
self._transfer_layout.removeWidget(item)
item.deleteLater()
self._update_empty_state()
def _on_cancel_requested(self, file_id: str) -> None:
"""处理取消请求"""
self.cancel_transfer_requested.emit(file_id)
def show_file_dialog(self, peer_id: str) -> Optional[str]:
"""
显示文件选择对话框
实现文件选择对话框 (需求 4.1)
WHEN 用户选择发送文件 THEN File_Transfer_Module SHALL 允许用户从本地选择任意类型的文件
Args:
peer_id: 目标用户ID
Returns:
选择的文件路径如果取消则返回None
"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"选择文件",
"",
"所有文件 (*.*)"
)
if file_path:
self.send_file_requested.emit(peer_id, file_path)
return file_path
return None
def show_image_dialog(self, peer_id: str) -> Optional[str]:
"""
显示图片选择对话框
Args:
peer_id: 目标用户ID
Returns:
选择的图片路径
"""
file_path, _ = QFileDialog.getOpenFileName(
self,
"选择图片",
"",
"图片文件 (*.jpg *.jpeg *.png *.gif *.bmp);;所有文件 (*.*)"
)
if file_path:
self.send_file_requested.emit(peer_id, file_path)
return file_path
return None
def show_save_dialog(self, file_name: str) -> Optional[str]:
"""
显示保存文件对话框
实现用户选择保存位置 (需求 4.6)
WHEN 接收到文件 THEN P2P_Client SHALL 允许用户选择保存位置
Args:
file_name: 默认文件名
Returns:
保存路径
"""
file_path, _ = QFileDialog.getSaveFileName(
self,
"保存文件",
file_name,
"所有文件 (*.*)"
)
return file_path if file_path else None
def show_image_preview(self, image_path: str) -> None:
"""
显示图片预览
实现图片预览和查看 (需求 5.4)
WHEN 用户点击图片缩略图 THEN P2P_Client SHALL 打开图片查看器显示完整图片
Args:
image_path: 图片路径
"""
dialog = ImagePreviewDialog(image_path, self)
dialog.exec()
def create_image_thumbnail(self, image_path: str) -> Optional["ImageThumbnail"]:
"""
创建图片缩略图组件
实现图片缩略图预览 (需求 5.2, 5.3)
Args:
image_path: 图片路径
Returns:
缩略图组件
"""
if not os.path.exists(image_path):
return None
thumbnail = ImageThumbnail(image_path)
thumbnail.clicked.connect(self.show_image_preview)
return thumbnail
class ImagePreviewDialog(QDialog):
"""
图片预览对话框
实现图片预览和查看 (需求 5.3, 5.4)
WHEN 图片传输完成 THEN P2P_Client SHALL 在聊天窗口内直接显示图片缩略图
WHEN 用户点击图片缩略图 THEN P2P_Client SHALL 打开图片查看器显示完整图片
"""
def __init__(self, image_path: str, parent=None):
super().__init__(parent)
self.image_path = image_path
self._rotation = 0
self._zoom_level = 100
self._setup_ui()
self._load_image()
def _setup_ui(self) -> None:
"""设置UI"""
self.setWindowTitle("图片预览")
self.setMinimumSize(600, 500)
self.resize(800, 600)
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# 图片显示区域
scroll_area = QScrollArea()
scroll_area.setWidgetResizable(True)
scroll_area.setAlignment(Qt.AlignmentFlag.AlignCenter)
scroll_area.setStyleSheet("background-color: #2d2d2d;")
self._image_label = QLabel()
self._image_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
scroll_area.setWidget(self._image_label)
layout.addWidget(scroll_area, 1)
# 工具栏
toolbar = QFrame()
toolbar.setFixedHeight(50)
toolbar.setStyleSheet("""
QFrame {
background-color: #f5f5f5;
border-top: 1px solid #ddd;
}
""")
toolbar_layout = QHBoxLayout(toolbar)
toolbar_layout.setContentsMargins(10, 5, 10, 5)
toolbar_layout.setSpacing(10)
# 缩放控制
zoom_out_btn = QPushButton("")
zoom_out_btn.setFixedSize(30, 30)
zoom_out_btn.clicked.connect(self._zoom_out)
toolbar_layout.addWidget(zoom_out_btn)
self._zoom_slider = QSlider(Qt.Orientation.Horizontal)
self._zoom_slider.setRange(25, 400)
self._zoom_slider.setValue(100)
self._zoom_slider.setFixedWidth(150)
self._zoom_slider.valueChanged.connect(self._on_zoom_changed)
toolbar_layout.addWidget(self._zoom_slider)
zoom_in_btn = QPushButton("")
zoom_in_btn.setFixedSize(30, 30)
zoom_in_btn.clicked.connect(self._zoom_in)
toolbar_layout.addWidget(zoom_in_btn)
self._zoom_label = QLabel("100%")
self._zoom_label.setFixedWidth(50)
toolbar_layout.addWidget(self._zoom_label)
toolbar_layout.addStretch()
# 旋转按钮
rotate_left_btn = QPushButton("")
rotate_left_btn.setFixedSize(30, 30)
rotate_left_btn.setToolTip("向左旋转")
rotate_left_btn.clicked.connect(self._rotate_left)
toolbar_layout.addWidget(rotate_left_btn)
rotate_right_btn = QPushButton("")
rotate_right_btn.setFixedSize(30, 30)
rotate_right_btn.setToolTip("向右旋转")
rotate_right_btn.clicked.connect(self._rotate_right)
toolbar_layout.addWidget(rotate_right_btn)
toolbar_layout.addStretch()
# 关闭按钮
close_btn = QPushButton("关闭")
close_btn.clicked.connect(self.close)
toolbar_layout.addWidget(close_btn)
layout.addWidget(toolbar)
def _load_image(self) -> None:
"""加载图片"""
if not os.path.exists(self.image_path):
self._image_label.setText("图片不存在")
return
self._original_pixmap = QPixmap(self.image_path)
if self._original_pixmap.isNull():
self._image_label.setText("无法加载图片")
return
self._update_display()
def _update_display(self) -> None:
"""更新图片显示"""
if not hasattr(self, '_original_pixmap') or self._original_pixmap.isNull():
return
# 应用旋转
transform = QTransform()
transform.rotate(self._rotation)
rotated = self._original_pixmap.transformed(transform, Qt.TransformationMode.SmoothTransformation)
# 应用缩放
scale = self._zoom_level / 100.0
scaled = rotated.scaled(
int(rotated.width() * scale),
int(rotated.height() * scale),
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation
)
self._image_label.setPixmap(scaled)
def _zoom_in(self) -> None:
"""放大"""
self._zoom_slider.setValue(min(400, self._zoom_level + 25))
def _zoom_out(self) -> None:
"""缩小"""
self._zoom_slider.setValue(max(25, self._zoom_level - 25))
def _on_zoom_changed(self, value: int) -> None:
"""缩放变化"""
self._zoom_level = value
self._zoom_label.setText(f"{value}%")
self._update_display()
def _rotate_left(self) -> None:
"""向左旋转"""
self._rotation = (self._rotation - 90) % 360
self._update_display()
def _rotate_right(self) -> None:
"""向右旋转"""
self._rotation = (self._rotation + 90) % 360
self._update_display()
class ImageThumbnail(QFrame):
"""
图片缩略图组件
实现图片缩略图预览 (需求 5.2)
WHEN 图片发送前 THEN P2P_Client SHALL 显示图片缩略图预览
"""
clicked = pyqtSignal(str) # image_path
def __init__(self, image_path: str, thumbnail_size: tuple = (150, 150), parent=None):
super().__init__(parent)
self.image_path = image_path
self.thumbnail_size = thumbnail_size
self._setup_ui()
def _setup_ui(self) -> None:
"""设置UI"""
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setCursor(Qt.CursorShape.PointingHandCursor)
self.setStyleSheet("""
QFrame {
background-color: white;
border: 1px solid #ddd;
border-radius: 5px;
}
QFrame:hover {
border-color: #4a90d9;
}
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(5, 5, 5, 5)
layout.setSpacing(5)
# 缩略图
self._image_label = QLabel()
self._image_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self._image_label.setFixedSize(*self.thumbnail_size)
if os.path.exists(self.image_path):
pixmap = QPixmap(self.image_path)
if not pixmap.isNull():
scaled = pixmap.scaled(
*self.thumbnail_size,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation
)
self._image_label.setPixmap(scaled)
else:
self._image_label.setText("无法加载")
else:
self._image_label.setText("图片不存在")
layout.addWidget(self._image_label)
# 文件名
file_name = os.path.basename(self.image_path)
if len(file_name) > 20:
file_name = file_name[:17] + "..."
name_label = QLabel(file_name)
name_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
name_label.setStyleSheet("font-size: 11px; color: #666;")
layout.addWidget(name_label)
def mousePressEvent(self, event) -> None:
"""鼠标点击事件"""
if event.button() == Qt.MouseButton.LeftButton:
self.clicked.emit(self.image_path)
super().mousePressEvent(event)