import sqlite3 import time import requests import os import json import shutil from pathlib import Path # --- 配置区 --- # 数据库路径 db_path = Path.home() / "Library/Application Support/Cursor/User/globalStorage/state.vscdb" # 临时文件路径(避免锁定) temp_db_path = Path.home() / "Documents/cursor_history_temp.vscdb" # 你的服务器接收地址 server_url = "https://your-server.com/api/forward" def get_latest_chat_data(): """从 SQLite 提取最可能的 AI 对话数据""" try: if not db_path.exists(): print(f"❌ 找不到数据库: {db_path}") return None # 复制副本以防 Cursor 锁定文件 shutil.copy2(db_path, temp_db_path) conn = sqlite3.connect(str(temp_db_path)) cursor = conn.cursor() # 优先级:Composer 数据 > 传统 Chat 数据 potential_keys = [ 'composer.composerData', 'workbench.panel.ai.chat.history', 'cursor.chat.history' ] result_json = None for key in potential_keys: cursor.execute("SELECT value FROM ItemTable WHERE key = ? LIMIT 1", (key,)) row = cursor.fetchone() if row and len(row[0]) > 100: # 过滤掉过短的空状态 result_json = row[0] break conn.close() if temp_db_path.exists(): os.remove(temp_db_path) return result_json except Exception as e: print(f"读取出错: {e}") return None def extract_content(raw_json): """解析复杂的 Cursor JSON 结构,提取最近的对话""" try: data = json.loads(raw_json) messages = [] # 逻辑 1: 处理 Composer 数据结构 if 'allComposers' in data: # 取最近一个 composer 会话 latest_comp = data['allComposers'][-1] conversations = latest_comp.get('conversation', []) for msg in conversations: role = "USER" if msg.get('type') == 1 else "AI" # Composer 的文本可能在 text 或 richText 中 text = msg.get('text', '') if text: messages.append({"role": role, "content": text}) # 逻辑 2: 处理传统 Chat 数据结构 (tabs 模式) elif 'tabs' in data: latest_tab = data['tabs'][-1] bubbles = latest_tab.get('bubbles', []) for b in bubbles: role = "USER" if b.get('type') == 'user' else "AI" text = b.get('text', '') if text: messages.append({"role": role, "content": text}) return messages except: return [] def main(): last_snapshot = "" print(f"🚀 Cursor 监控已启动 (Mac版)") print(f"📂 监听路径: {db_path}") print("💡 请在 Cursor 中输入消息,稍等片刻即可看到打印...") while True: raw_data = get_latest_chat_data() if raw_data and raw_data != last_snapshot: messages = extract_content(raw_data) if messages: # 获取最后一条对话 latest_msg = messages[-1] print("\n" + " ✨ " + "="*50 + " ✨ ") print(f"⏰ 更新时间: {time.strftime('%H:%M:%S')}") # 打印最后两条以显示上下文 for m in messages[-2:]: role_icon = "👤" if m['role'] == "USER" else "🤖" print(f"{role_icon} {m['role']}: {m['content'][:200].strip()}...") print(" ✨ " + "="*50 + " ✨ \n") # 转发到服务器 try: # 可以选择发送整段对话 messages,或者只发最后一条 latest_msg requests.post(server_url, json={"history": messages, "source": "cursor-mac"}, timeout=5) except: pass last_snapshot = raw_data time.sleep(3) # 每3秒检查一次磁盘写入 if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n👋 监控已停止")