|
|
import sqlite3
|
|
|
import time
|
|
|
import requests
|
|
|
import os
|
|
|
import json
|
|
|
import shutil
|
|
|
from pathlib import Path
|
|
|
|
|
|
# --- 配置区 ---
|
|
|
# 数据库路径
|
|
|
db_path = Path.home() / "Library/Application Support/Cursor/User/globalStorage/state.vscdb"
|
|
|
# 临时文件路径(避免锁定)
|
|
|
temp_db_path = Path.home() / "Documents/cursor_history_temp.vscdb"
|
|
|
# 你的服务器接收地址
|
|
|
server_url = "https://your-server.com/api/forward"
|
|
|
|
|
|
def get_latest_chat_data():
|
|
|
"""从 SQLite 提取最可能的 AI 对话数据"""
|
|
|
try:
|
|
|
if not db_path.exists():
|
|
|
print(f"❌ 找不到数据库: {db_path}")
|
|
|
return None
|
|
|
|
|
|
# 复制副本以防 Cursor 锁定文件
|
|
|
shutil.copy2(db_path, temp_db_path)
|
|
|
|
|
|
conn = sqlite3.connect(str(temp_db_path))
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
# 优先级:Composer 数据 > 传统 Chat 数据
|
|
|
potential_keys = [
|
|
|
'composer.composerData',
|
|
|
'workbench.panel.ai.chat.history',
|
|
|
'cursor.chat.history'
|
|
|
]
|
|
|
|
|
|
result_json = None
|
|
|
for key in potential_keys:
|
|
|
cursor.execute("SELECT value FROM ItemTable WHERE key = ? LIMIT 1", (key,))
|
|
|
row = cursor.fetchone()
|
|
|
if row and len(row[0]) > 100: # 过滤掉过短的空状态
|
|
|
result_json = row[0]
|
|
|
break
|
|
|
|
|
|
conn.close()
|
|
|
if temp_db_path.exists():
|
|
|
os.remove(temp_db_path)
|
|
|
|
|
|
return result_json
|
|
|
except Exception as e:
|
|
|
print(f"读取出错: {e}")
|
|
|
return None
|
|
|
|
|
|
def extract_content(raw_json):
|
|
|
"""解析复杂的 Cursor JSON 结构,提取最近的对话"""
|
|
|
try:
|
|
|
data = json.loads(raw_json)
|
|
|
messages = []
|
|
|
|
|
|
# 逻辑 1: 处理 Composer 数据结构
|
|
|
if 'allComposers' in data:
|
|
|
# 取最近一个 composer 会话
|
|
|
latest_comp = data['allComposers'][-1]
|
|
|
conversations = latest_comp.get('conversation', [])
|
|
|
for msg in conversations:
|
|
|
role = "USER" if msg.get('type') == 1 else "AI"
|
|
|
# Composer 的文本可能在 text 或 richText 中
|
|
|
text = msg.get('text', '')
|
|
|
if text:
|
|
|
messages.append({"role": role, "content": text})
|
|
|
|
|
|
# 逻辑 2: 处理传统 Chat 数据结构 (tabs 模式)
|
|
|
elif 'tabs' in data:
|
|
|
latest_tab = data['tabs'][-1]
|
|
|
bubbles = latest_tab.get('bubbles', [])
|
|
|
for b in bubbles:
|
|
|
role = "USER" if b.get('type') == 'user' else "AI"
|
|
|
text = b.get('text', '')
|
|
|
if text:
|
|
|
messages.append({"role": role, "content": text})
|
|
|
|
|
|
return messages
|
|
|
except:
|
|
|
return []
|
|
|
|
|
|
def main():
|
|
|
last_snapshot = ""
|
|
|
print(f"🚀 Cursor 监控已启动 (Mac版)")
|
|
|
print(f"📂 监听路径: {db_path}")
|
|
|
print("💡 请在 Cursor 中输入消息,稍等片刻即可看到打印...")
|
|
|
|
|
|
while True:
|
|
|
raw_data = get_latest_chat_data()
|
|
|
|
|
|
if raw_data and raw_data != last_snapshot:
|
|
|
messages = extract_content(raw_data)
|
|
|
|
|
|
if messages:
|
|
|
# 获取最后一条对话
|
|
|
latest_msg = messages[-1]
|
|
|
|
|
|
print("\n" + " ✨ " + "="*50 + " ✨ ")
|
|
|
print(f"⏰ 更新时间: {time.strftime('%H:%M:%S')}")
|
|
|
|
|
|
# 打印最后两条以显示上下文
|
|
|
for m in messages[-2:]:
|
|
|
role_icon = "👤" if m['role'] == "USER" else "🤖"
|
|
|
print(f"{role_icon} {m['role']}: {m['content'][:200].strip()}...")
|
|
|
|
|
|
print(" ✨ " + "="*50 + " ✨ \n")
|
|
|
|
|
|
# 转发到服务器
|
|
|
try:
|
|
|
# 可以选择发送整段对话 messages,或者只发最后一条 latest_msg
|
|
|
requests.post(server_url, json={"history": messages, "source": "cursor-mac"}, timeout=5)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
last_snapshot = raw_data
|
|
|
|
|
|
time.sleep(3) # 每3秒检查一次磁盘写入
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
try:
|
|
|
main()
|
|
|
except KeyboardInterrupt:
|
|
|
print("\n👋 监控已停止") |