You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

510 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DeepSeek AI助手 - 聊天管理模块
该模块实现了聊天功能的核心逻辑包括消息发送、AI回复处理、聊天显示更新等功能。
使用多线程处理API请求确保UI不会卡顿并支持日程和待办事项的自动识别与创建。
"""
# 导入标准库模块
import tkinter as tk # 基础GUI库
import customtkinter as ctk # 现代化GUI组件库
from datetime import datetime # 日期时间处理
import threading # 多线程支持
import re # 正则表达式
import uuid # 唯一ID生成
class ChatManager:
"""聊天管理器类
负责处理聊天功能的核心逻辑包括消息发送、AI回复获取、聊天界面更新等。
该类是UI和API之间的桥梁协调消息的发送与接收并处理相关的业务逻辑。
"""
def __init__(self, app):
"""初始化聊天管理器
参数:
app: 应用程序主实例,包含所有必要的组件和状态
该方法将应用程序实例中的重要属性复制到当前实例中,以便于访问和管理。
"""
self.app = app # 应用程序主实例
self.root = app.root # 主窗口实例
self.messages = app.messages # 当前会话的消息列表
self.current_conversation_id = app.current_conversation_id # 当前对话ID
self.chat_histories = app.chat_histories # 所有对话历史
self.api = app.api # DeepSeek API客户端实例
self.auth = app.auth # 用户认证实例
self.current_session_id = app.current_session_id # 当前会话ID
def send_message(self):
"""发送用户消息
该方法是消息发送的入口点,负责:
1. 从UI输入框获取用户消息
2. 验证消息内容的有效性
3. 清空输入框
4. 将用户消息添加到消息列表和当前对话历史
5. 更新聊天界面显示
6. 滚动到底部
7. 在新线程中异步获取AI回复避免阻塞UI
"""
# 安全检查确保UI组件已经初始化
if not hasattr(self.app, 'ui_components'):
return
# 安全检查确保get_user_input方法存在
if not hasattr(self.app.ui_components, 'get_user_input'):
return
# 获取用户输入框组件
user_input = self.app.ui_components.get_user_input()
# 安全检查确保user_input组件有get方法
if not hasattr(user_input, 'get'):
return
message = user_input.get().strip() # 获取并清理用户输入内容
# 空消息检查
if not message:
return
# 清空输入框,准备下一次输入
user_input.delete(0, tk.END)
# 创建用户消息对象
user_message = {
"role": "user", # 角色:用户
"content": message, # 消息内容
"timestamp": datetime.now().isoformat() # 消息发送时间ISO格式
}
# 添加到当前消息列表
self.messages.append(user_message)
# 更新当前对话历史
if self.current_conversation_id:
# 将消息添加到当前对话的历史记录中
self.chat_histories[self.current_conversation_id]["messages"].append(user_message)
# 如果是新对话,使用第一条消息内容作为对话标题
if self.chat_histories[self.current_conversation_id]["title"] == "新对话":
self.chat_histories[self.current_conversation_id]["title"] = message[:50] + ("..." if len(message) > 50 else "")
# 更新对话列表显示
if hasattr(self.app, 'conversation_manager'):
self.app.conversation_manager._update_conversation_list()
# 更新聊天界面显示,让用户看到自己发送的消息
self.update_chat_display()
# 自动滚动到底部,确保最新消息可见
self._scroll_to_bottom()
# 保存当前对话ID确保AI回复能正确添加到原对话
current_conv_id = self.current_conversation_id
# 在新线程中获取AI回复避免阻塞UI界面
# daemon=True表示线程随主线程退出而退出
threading.Thread(target=self._get_ai_response, args=(message, current_conv_id), daemon=True).start()
def _get_ai_response(self, message, original_conv_id):
"""获取AI回复
参数:
message: 用户输入的消息内容
original_conv_id: 发起请求时的对话ID确保回复添加到正确的对话中
该方法在独立线程中执行,负责:
1. 添加"思考中"的AI消息
2. 解析用户输入中的日程信息(如果有)
3. 解析用户输入中的待办事项(如果有)
4. 准备完整的对话上下文
5. 调用API获取AI回复
6. 将创建的日程和待办事项信息添加到回复中
7. 更新消息列表和对话历史
8. 处理可能的异常情况
"""
try:
# 添加"思考中"的AI消息提升用户体验
# 为当前消息列表和原始对话历史创建独立的消息对象
current_thinking_msg = {
"role": "assistant",
"content": "",
"timestamp": datetime.now().isoformat(),
"thinking": True
}
# 更新当前消息列表
self.messages.append(current_thinking_msg)
# 同时更新原始对话的历史记录,确保切换对话后也能正确处理
if original_conv_id and original_conv_id in self.chat_histories:
original_thinking_msg = {
"role": "assistant",
"content": "",
"timestamp": current_thinking_msg["timestamp"],
"thinking": True
}
self.chat_histories[original_conv_id]["messages"].append(original_thinking_msg)
self.chat_histories[original_conv_id]["last_updated"] = datetime.now().isoformat()
# 立即更新UI显示让用户看到"思考中"状态
if hasattr(self, 'root') and self.root:
self.root.after(0, self.update_chat_display)
self.root.after(0, self._scroll_to_bottom)
# 尝试解析用户输入是否包含日程信息
schedule_data = self.api.parse_schedule(message)
schedule_created = False
todo_created = False
# 如果解析出有效的日程信息
if schedule_data and "title" in schedule_data and schedule_data["title"]:
# 检查当前会话是否有效
current_user = self.app.auth.get_current_user(self.app.current_session_id)
if current_user:
# 创建日程传递session_id作为参数
success, schedule = self.app.auth.create_schedule(self.app.current_session_id, schedule_data)
if success:
schedule_created = True
# 准备日程创建成功的提示信息
schedule_info = f"已为您创建日程:\n"
schedule_info += f"标题:{schedule['title']}\n"
if schedule['start_time']:
schedule_info += f"开始时间:{schedule['start_time']}\n"
if schedule['end_time']:
schedule_info += f"结束时间:{schedule['end_time']}\n"
if schedule['location']:
schedule_info += f"地点:{schedule['location']}\n"
if schedule['description']:
schedule_info += f"描述:{schedule['description']}\n"
# 尝试解析用户输入是否包含待办事项信息
todo_data = self.api.parse_todo(message)
if todo_data and "title" in todo_data and todo_data["title"]:
# 检查当前会话是否有效
current_user = self.app.auth.get_current_user(self.app.current_session_id)
if current_user:
# 创建待办事项传递session_id作为参数
success, todo = self.app.auth.create_todo(self.app.current_session_id, todo_data)
if success:
todo_created = True
# 准备待办事项创建成功的提示信息
todo_info = f"已为您创建待办事项:\n"
todo_info += f"标题:{todo['title']}\n"
if todo['due_date']:
todo_info += f"截止日期:{todo['due_date']}\n"
if todo['priority']:
todo_info += f"优先级:{todo['priority']}\n"
if todo['description']:
todo_info += f"描述:{todo['description']}\n"
# 准备完整对话上下文用于API调用
# 使用原始对话的历史记录,确保即使切换对话也能获取正确的上下文
conversation = []
if original_conv_id and original_conv_id in self.chat_histories:
original_messages = self.chat_histories[original_conv_id]["messages"]
# 获取所有非当前思考中的原始对话消息
for msg in original_messages[:-1]: # 不包括当前思考中的消息
if msg["role"] in ["user", "assistant"]: # 只包含用户和助手的消息
conversation.append({
"role": msg["role"],
"content": msg["content"]
})
else:
# 如果找不到原始对话历史,使用当前消息列表作为备选
for msg in self.messages[:-1]: # 不包括当前思考中的消息
if msg["role"] in ["user", "assistant"]: # 只包含用户和助手的消息
conversation.append({
"role": msg["role"],
"content": msg["content"]
})
# 调用API获取AI回复使用完整对话上下文
print(f"[DEBUG] 调用API对话ID: {original_conv_id}")
print(f"[DEBUG] 对话上下文: {conversation}")
response = self.api.call_api(conversation)
print(f"[DEBUG] API响应: {response[:50]}...") # 只打印前50个字符
# 检查API调用是否返回了错误信息
api_error = False
if response.startswith("错误:"):
api_error = True
print(f"[DEBUG] API调用错误: {response}")
# 如果创建了日程,将日程信息添加到回复开头
if schedule_created:
response = schedule_info + "\n\n" + response
# 如果创建了待办事项,将待办事项信息添加到回复结尾
if todo_created:
response += "\n\n" + todo_info
# 替换思考中的消息为实际的AI回复
print(f"[DEBUG] 准备AI回复对话ID: {original_conv_id}")
assistant_msg = {
"role": "assistant",
"content": response,
"timestamp": datetime.now().isoformat()
}
# 检查当前是否仍在原对话中
print(f"[DEBUG] 当前对话ID: {self.current_conversation_id}, 原对话ID: {original_conv_id}")
if self.current_conversation_id == original_conv_id:
# 如果是原对话,直接更新当前消息列表
replaced = False
for i, msg in enumerate(self.messages):
if msg.get("thinking", False):
print(f"[DEBUG] 更新当前消息列表中的思考消息")
self.messages[i] = assistant_msg
replaced = True
break
if not replaced:
self.messages.append(assistant_msg)
# 确保回复始终添加到原对话的历史记录中
if original_conv_id and original_conv_id in self.chat_histories:
print(f"[DEBUG] 更新原对话历史对话ID: {original_conv_id}")
# 找到并替换原对话历史中的"思考中"消息
original_messages = self.chat_histories[original_conv_id]["messages"]
# 遍历查找所有思考中的消息并替换
replaced = False
for i, msg in enumerate(original_messages):
if msg.get("thinking", False):
print(f"[DEBUG] 替换原对话历史中位置 {i} 的思考消息")
original_messages[i] = assistant_msg
replaced = True
break
if not replaced:
# 如果没有"思考中"消息(可能被其他操作删除),直接添加
print(f"[DEBUG] 未找到思考中消息,直接添加回复到原对话历史")
original_messages.append(assistant_msg)
# 更新原对话的最后更新时间
self.chat_histories[original_conv_id]["last_updated"] = datetime.now().isoformat()
except Exception as e:
# 捕获所有异常,确保线程不会意外终止
error_msg = {
"role": "assistant",
"content": f"抱歉,发生错误:{str(e)}",
"timestamp": datetime.now().isoformat()
}
# 检查当前是否仍在原对话中
if self.current_conversation_id == original_conv_id:
# 如果是原对话,直接更新当前消息列表
replaced = False
for i, msg in enumerate(self.messages):
if msg.get("thinking", False):
self.messages[i] = error_msg
replaced = True
break
if not replaced:
# 否则直接添加错误消息
self.messages.append(error_msg)
# 确保错误消息始终添加到原对话的历史记录中
if original_conv_id and original_conv_id in self.chat_histories:
# 找到并替换原对话历史中的"思考中"消息
original_messages = self.chat_histories[original_conv_id]["messages"]
replaced = False
for i, msg in enumerate(original_messages):
if msg.get("thinking", False):
original_messages[i] = error_msg
replaced = True
break
if not replaced:
# 如果没有"思考中"消息(可能被其他操作删除),直接添加
original_messages.append(error_msg)
# 更新原对话的最后更新时间
self.chat_histories[original_conv_id]["last_updated"] = datetime.now().isoformat()
# 更新聊天显示让用户看到AI回复或错误信息
print(f"[DEBUG] 更新UI显示当前对话ID: {self.current_conversation_id}, 原对话ID: {original_conv_id}")
if hasattr(self, 'root') and self.root:
if self.current_conversation_id == original_conv_id:
print(f"[DEBUG] 在原对话中更新UI")
self.root.after(0, self.update_chat_display)
self.root.after(0, self._scroll_to_bottom)
# 保存聊天历史到文件
if hasattr(self.app, 'save_chat_history'):
if hasattr(self, 'root') and self.root:
self.root.after(0, self.app.save_chat_history)
else:
# 如果没有UI组件例如在测试环境中直接保存
self.app.save_chat_history()
def update_chat_display(self):
"""更新聊天显示界面
该方法负责清空聊天区域并重新显示所有消息。它会:
1. 检查UI组件是否已经初始化
2. 清空聊天滚动区域中的所有子组件
3. 遍历所有消息调用_display_message方法逐个显示
"""
# 安全检查确保UI组件已经初始化
if not hasattr(self.app, 'ui_components'):
return
# 清空聊天区域,准备重新绘制
for widget in self.app.ui_components.get_chat_scrollable_frame().winfo_children():
widget.destroy()
# 遍历所有消息并显示
for msg in self.messages:
self._display_message(msg)
def _display_message(self, msg):
"""显示单条消息
参数:
msg: 消息对象,格式为{
"role": "user/assistant",
"content": "消息内容",
"timestamp": "ISO格式时间",
"thinking": True/False (可选)
}
该方法根据消息类型(用户或助手)创建不同风格的消息气泡:
- 用户消息:右对齐,蓝色气泡
- 助手消息:左对齐,灰色气泡
每条消息包含:
1. 时间戳
2. 发送者名称
3. 消息内容气泡
"""
# 安全检查确保UI组件已经初始化
if not hasattr(self.app, 'ui_components'):
return
if msg["role"] == "user":
# 用户消息气泡(右对齐)
msg_frame = ctk.CTkFrame(self.app.ui_components.get_chat_scrollable_frame(), fg_color="#1e1e1e")
msg_frame.pack(fill="x", padx=10, pady=5, anchor="e") # 右对齐
# 时间戳处理和显示
timestamp = msg["timestamp"]
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)
timestamp_label = ctk.CTkLabel(
msg_frame,
text=timestamp.strftime("%H:%M"), # 格式化为小时:分钟
font=ctk.CTkFont(size=10),
text_color="#999999"
)
timestamp_label.pack(side="right", padx=(0, 10), pady=(0, 2))
# 用户名显示
user_label = ctk.CTkLabel(
msg_frame,
text="", # 用户名称固定为"我"
font=ctk.CTkFont(size=12, weight="bold"),
text_color="#66b3ff"
)
user_label.pack(side="right", padx=(5, 10), anchor="ne")
# 消息内容气泡
bubble_frame = ctk.CTkFrame(
msg_frame,
fg_color="#0078d4", # 蓝色气泡
corner_radius=18, # 圆角设计
width=400
)
bubble_frame.pack(side="right", padx=10, pady=5, fill="x", expand=False, anchor="ne")
content_label = ctk.CTkLabel(
bubble_frame,
text=msg["content"],
font=ctk.CTkFont(family="Microsoft YaHei UI", size=14),
text_color="#ffffff",
wraplength=450, # 自动换行宽度
justify="left"
)
content_label.pack(padx=20, pady=12, anchor="ne")
else:
# 助手消息气泡(左对齐)
msg_frame = ctk.CTkFrame(self.app.ui_components.get_chat_scrollable_frame(), fg_color="#1e1e1e")
msg_frame.pack(fill="x", padx=10, pady=5, anchor="w") # 左对齐
# 时间戳处理和显示
timestamp = msg["timestamp"]
if isinstance(timestamp, str):
timestamp = datetime.fromisoformat(timestamp)
timestamp_label = ctk.CTkLabel(
msg_frame,
text=timestamp.strftime("%H:%M"), # 格式化为小时:分钟
font=ctk.CTkFont(size=10),
text_color="#999999"
)
timestamp_label.pack(side="left", padx=(10, 0), pady=(0, 2))
# 助手名称显示
assistant_label = ctk.CTkLabel(
msg_frame,
text="DeepSeek", # 助手固定名称
font=ctk.CTkFont(size=12, weight="bold"),
text_color="#4ec9b0"
)
assistant_label.pack(side="left", padx=(10, 5), anchor="nw")
# 消息内容气泡
bubble_frame = ctk.CTkFrame(
msg_frame,
fg_color="#333333", # 灰色气泡
corner_radius=18, # 圆角设计
width=400
)
bubble_frame.pack(side="left", padx=10, pady=5, fill="x", expand=False, anchor="nw")
# 如果是思考中的消息,显示"思考中...",否则显示实际内容
content = msg["content"] if not msg.get("thinking") else "思考中..."
content_label = ctk.CTkLabel(
bubble_frame,
text=content,
font=ctk.CTkFont(family="Microsoft YaHei UI", size=14),
text_color="#ffffff",
wraplength=450, # 自动换行宽度
justify="left"
)
content_label.pack(padx=20, pady=12, anchor="nw")
def _scroll_to_bottom(self):
"""滚动聊天窗口到底部
该方法使用root.after延迟执行滚动操作确保UI更新完成后再滚动。
这样可以避免滚动到错误的位置(如果消息还在渲染中)。
"""
# 使用after方法确保UI更新完成后再滚动
self.root.after(100, self._do_scroll_to_bottom)
def _do_scroll_to_bottom(self):
"""执行滚动到底部的实际操作
该方法尝试获取聊天滚动区域的画布组件并调用yview_moveto(1.0)将其滚动到底部。
如果出现任何异常,会优雅地处理,确保程序不会崩溃。
"""
# 获取聊天滚动区域组件
if hasattr(self.app.ui_components, 'get_chat_scrollable_frame'):
chat_frame = self.app.ui_components.get_chat_scrollable_frame()
# 检查scrollable_frame是否有_parent_canvas属性customtkinter的内部属性
if hasattr(chat_frame, '_parent_canvas'):
try:
# 将画布滚动到底部
chat_frame._parent_canvas.yview_moveto(1.0)
except Exception:
# 如果滚动失败,至少确保聊天显示已更新
pass