You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
4.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sqlite3
import time
import requests
import os
import json
import shutil
from pathlib import Path
# --- 配置区 ---
# 数据库路径
db_path = Path.home() / "Library/Application Support/Cursor/User/globalStorage/state.vscdb"
# 临时文件路径(避免锁定)
temp_db_path = Path.home() / "Documents/cursor_history_temp.vscdb"
# 你的服务器接收地址
server_url = "https://your-server.com/api/forward"
def get_latest_chat_data():
"""从 SQLite 提取最可能的 AI 对话数据"""
try:
if not db_path.exists():
print(f"❌ 找不到数据库: {db_path}")
return None
# 复制副本以防 Cursor 锁定文件
shutil.copy2(db_path, temp_db_path)
conn = sqlite3.connect(str(temp_db_path))
cursor = conn.cursor()
# 优先级Composer 数据 > 传统 Chat 数据
potential_keys = [
'composer.composerData',
'workbench.panel.ai.chat.history',
'cursor.chat.history'
]
result_json = None
for key in potential_keys:
cursor.execute("SELECT value FROM ItemTable WHERE key = ? LIMIT 1", (key,))
row = cursor.fetchone()
if row and len(row[0]) > 100: # 过滤掉过短的空状态
result_json = row[0]
break
conn.close()
if temp_db_path.exists():
os.remove(temp_db_path)
return result_json
except Exception as e:
print(f"读取出错: {e}")
return None
def extract_content(raw_json):
"""解析复杂的 Cursor JSON 结构,提取最近的对话"""
try:
data = json.loads(raw_json)
messages = []
# 逻辑 1: 处理 Composer 数据结构
if 'allComposers' in data:
# 取最近一个 composer 会话
latest_comp = data['allComposers'][-1]
conversations = latest_comp.get('conversation', [])
for msg in conversations:
role = "USER" if msg.get('type') == 1 else "AI"
# Composer 的文本可能在 text 或 richText 中
text = msg.get('text', '')
if text:
messages.append({"role": role, "content": text})
# 逻辑 2: 处理传统 Chat 数据结构 (tabs 模式)
elif 'tabs' in data:
latest_tab = data['tabs'][-1]
bubbles = latest_tab.get('bubbles', [])
for b in bubbles:
role = "USER" if b.get('type') == 'user' else "AI"
text = b.get('text', '')
if text:
messages.append({"role": role, "content": text})
return messages
except:
return []
def main():
last_snapshot = ""
print(f"🚀 Cursor 监控已启动 (Mac版)")
print(f"📂 监听路径: {db_path}")
print("💡 请在 Cursor 中输入消息,稍等片刻即可看到打印...")
while True:
raw_data = get_latest_chat_data()
if raw_data and raw_data != last_snapshot:
messages = extract_content(raw_data)
if messages:
# 获取最后一条对话
latest_msg = messages[-1]
print("\n" + "" + "="*50 + "")
print(f"⏰ 更新时间: {time.strftime('%H:%M:%S')}")
# 打印最后两条以显示上下文
for m in messages[-2:]:
role_icon = "👤" if m['role'] == "USER" else "🤖"
print(f"{role_icon} {m['role']}: {m['content'][:200].strip()}...")
print("" + "="*50 + "\n")
# 转发到服务器
try:
# 可以选择发送整段对话 messages或者只发最后一条 latest_msg
requests.post(server_url, json={"history": messages, "source": "cursor-mac"}, timeout=5)
except:
pass
last_snapshot = raw_data
time.sleep(3) # 每3秒检查一次磁盘写入
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n👋 监控已停止")