diff --git a/api/export_api.py b/api/export_api.py
index 2cff8f9..4c8ddf2 100644
--- a/api/export_api.py
+++ b/api/export_api.py
@@ -7,20 +7,29 @@
import io
import json
import os
-import re
import sqlite3
import zipfile
-from contextlib import closing
from datetime import datetime
from pathlib import Path
from flask import Blueprint, Response, current_app, jsonify, request
from utils.workspace_path import resolve_workspace_path
-from utils.path_helpers import get_workspace_folder_paths, to_epoch_ms
-from utils.text_extract import extract_text_from_bubble
-from utils.tool_parser import parse_tool_call
+from utils.path_helpers import to_epoch_ms
+from utils.text_extract import extract_text_from_bubble, slug
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
+from utils.cursor_md_exporter import cursor_ide_chat_to_markdown
+from services.workspace_db import (
+ _build_composer_id_to_workspace_id,
+ _collect_workspace_entries,
+ load_bubble_map,
+ load_code_block_diff_map,
+ _open_global_db,
+)
+from services.workspace_resolver import (
+ _get_workspace_display_name,
+ _create_project_name_to_workspace_id_map,
+)
bp = Blueprint("export_api", __name__)
@@ -54,14 +63,6 @@ def _save_export_state(count: int):
json.dump(state, f, indent=2)
-def _slug(s: str) -> str:
- s = re.sub(r'[<>:"/\\|?*]', "_", s or "")
- s = re.sub(r"\s+", "-", s)
- s = re.sub(r"-+", "-", s)
- s = s.strip("-")
- return s[:80] or "untitled"
-
-
@bp.route("/api/export/state")
def get_export_state():
"""Return the last export timestamp."""
@@ -78,21 +79,11 @@ def export_chats():
application startup; an app restart is required to pick up changes to the
exclusion rules file.
"""
- # Outer try/finally guarantees the global-storage connection is closed
- # on every exit path including unexpected exceptions (issue #17). Keeps
- # the existing function body shape; just ensures cleanup.
- conn = None
try:
body = request.get_json(silent=True) or {}
since = "last" if body.get("since") == "last" else "all"
workspace_path = resolve_workspace_path()
- global_db_path = os.path.normpath(
- os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")
- )
-
- if not os.path.isfile(global_db_path):
- return jsonify({"error": "Cursor global storage not found"}), 404
# Determine last export timestamp for filtering
last_export_ms = 0
@@ -102,309 +93,95 @@ def export_chats():
if ts_str:
last_export_ms = to_epoch_ms(ts_str)
- conn = sqlite3.connect(f"file:{global_db_path}?mode=ro", uri=True)
- conn.row_factory = sqlite3.Row
-
- # Build workspace mapping
- from urllib.parse import unquote as _url_unquote
- workspace_entries = []
- ws_id_to_slug = {}
- ws_id_to_display_name = {} # human-readable, URL-decoded folder name
- for name in os.listdir(workspace_path):
- full = os.path.join(workspace_path, name)
- wj = os.path.join(full, "workspace.json")
- if os.path.isdir(full) and os.path.isfile(wj):
- workspace_entries.append({"name": name, "path": wj})
- try:
- with open(wj, "r", encoding="utf-8") as f:
- wd = json.load(f)
- folders = get_workspace_folder_paths(wd)
- first_folder = folders[0] if folders else None
- if isinstance(first_folder, str) and first_folder:
- fn = first_folder.replace("\\", "/").split("/")[-1]
- if fn:
- ws_id_to_slug[name] = _slug(fn)
- ws_id_to_display_name[name] = _url_unquote(fn)
- except Exception:
- pass
-
- # Build composer → workspace from per-workspace dbs
- composer_id_to_ws = {}
- for entry in workspace_entries:
- db_path = os.path.join(workspace_path, entry["name"], "state.vscdb")
- if not os.path.isfile(db_path):
- continue
- try:
- # closing() guarantees .close() on scope exit (issue #17).
- with closing(sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)) as wconn:
- row = wconn.execute(
- "SELECT value FROM ItemTable WHERE [key] = 'composer.composerData'"
- ).fetchone()
- if row and row[0]:
- data = json.loads(row[0])
- for c in (data.get("allComposers") or []):
- cid = c.get("composerId") if isinstance(c, dict) else None
- if cid:
- composer_id_to_ws[cid] = entry["name"]
- except Exception:
- pass
-
- # Load bubble data for text extraction
- bubble_map = {}
- for row in conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"):
- parts = row["key"].split(":")
- if len(parts) >= 3:
- bid = parts[2]
- try:
- b = json.loads(row["value"])
- if isinstance(b, dict):
- bubble_map[bid] = b
- except Exception:
- pass
-
- # Process composers
- composer_rows = conn.execute(
- "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%'"
- " AND value LIKE '%fullConversationHeadersOnly%'"
- " AND value NOT LIKE '%fullConversationHeadersOnly\":[]%'"
- ).fetchall()
+ # ── Workspace scanning via service layer ──────────────────────────────
+ workspace_entries = _collect_workspace_entries(workspace_path)
+ composer_id_to_ws = _build_composer_id_to_workspace_id(workspace_path, workspace_entries)
+ project_name_map = _create_project_name_to_workspace_id_map(workspace_entries)
+
+ # Build display-name and slug maps
+ ws_id_to_slug: dict[str, str] = {}
+ ws_id_to_display_name: dict[str, str] = {}
+ for e in workspace_entries:
+ display = _get_workspace_display_name(workspace_path, e["name"])
+ if display != e["name"]:
+ ws_id_to_display_name[e["name"]] = display
+ ws_id_to_slug[e["name"]] = slug(display)
today = datetime.now().strftime("%Y-%m-%d")
exported = []
rules = current_app.config.get("EXCLUSION_RULES") or []
- for row in composer_rows:
- composer_id = row["key"].split(":")[1]
+ # ── Database reading via service layer ────────────────────────────────
+ with _open_global_db(workspace_path) as (global_db, global_db_path):
+ if global_db is None:
+ return jsonify({"error": "Cursor global storage not found"}), 404
+
+ bubble_map = load_bubble_map(global_db)
+ code_block_diff_map = load_code_block_diff_map(global_db)
+
try:
- cd = json.loads(row["value"])
- headers = cd.get("fullConversationHeadersOnly") or []
- if not headers:
- continue
-
- updated_at_ms = to_epoch_ms(cd.get("lastUpdatedAt")) or to_epoch_ms(cd.get("createdAt")) or 0
- if since == "last" and updated_at_ms and updated_at_ms <= last_export_ms:
- continue
-
- ws_id = composer_id_to_ws.get(composer_id, "global")
- ws_slug = "other-chats" if ws_id == "global" else (ws_id_to_slug.get(ws_id) or _slug(ws_id[:12]))
- ws_display_name = "Other chats" if ws_id == "global" else (ws_id_to_display_name.get(ws_id) or ws_slug)
- title = cd.get("name") or f"Chat {composer_id[:8]}"
- model_config = cd.get("modelConfig") or {}
- model_name = model_config.get("modelName")
- model_names = [model_name] if model_name and model_name != "default" else None
- bubble_texts = []
- for h in headers:
- b = bubble_map.get(h.get("bubbleId"))
- if not b:
+ composer_rows = global_db.execute(
+ "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%'"
+ " AND value LIKE '%fullConversationHeadersOnly%'"
+ " AND value NOT LIKE '%fullConversationHeadersOnly\":[]%'"
+ ).fetchall()
+ except sqlite3.Error:
+ composer_rows = []
+
+ for row in composer_rows:
+ composer_id = row["key"].split(":")[1]
+ try:
+ cd = json.loads(row["value"])
+ headers = cd.get("fullConversationHeadersOnly") or []
+ if not headers:
continue
- bt = extract_text_from_bubble(b)
- if bt:
- bubble_texts.append(bt)
- searchable = build_searchable_text(
- project_name=ws_display_name,
- chat_title=title,
- model_names=model_names,
- chat_content_snippet="\n\n".join(bubble_texts) if bubble_texts else None,
- )
- if is_excluded_by_rules(rules, searchable):
- continue
- title_slug = _slug(title)
- ts_ms = updated_at_ms or int(datetime.now().timestamp() * 1000)
- ts_str = datetime.fromtimestamp(ts_ms / 1000).strftime("%Y-%m-%dT%H-%M-%S")
- filename = f"{ts_str}__{title_slug}__{composer_id[:8]}.md"
- rel_path = os.path.join(today, ws_slug, "chat", filename)
-
- # Build markdown content
- bubbles = []
- for h in headers:
- bid = h.get("bubbleId")
- b = bubble_map.get(bid)
- if not b:
+
+ updated_at_ms = to_epoch_ms(cd.get("lastUpdatedAt")) or to_epoch_ms(cd.get("createdAt")) or 0
+ if since == "last" and updated_at_ms and updated_at_ms <= last_export_ms:
continue
- text = extract_text_from_bubble(b)
- has_tool = isinstance(b.get("toolFormerData"), dict)
- has_thinking = bool(b.get("thinking"))
- if not text.strip() and not has_tool and not has_thinking:
+
+ ws_id = composer_id_to_ws.get(composer_id, "global")
+ ws_slug = "other-chats" if ws_id == "global" else (ws_id_to_slug.get(ws_id) or slug(ws_id[:12]))
+ ws_display_name = "Other chats" if ws_id == "global" else (ws_id_to_display_name.get(ws_id) or ws_slug)
+ title = cd.get("name") or f"Chat {composer_id[:8]}"
+ model_config = cd.get("modelConfig") or {}
+ model_name = model_config.get("modelName")
+ model_names = [model_name] if model_name and model_name != "default" else None
+
+ bubble_texts = []
+ for h in headers:
+ b = bubble_map.get(h.get("bubbleId"))
+ if b:
+ bt = extract_text_from_bubble(b)
+ if bt:
+ bubble_texts.append(bt)
+
+ searchable = build_searchable_text(
+ project_name=ws_display_name,
+ chat_title=title,
+ model_names=model_names,
+ chat_content_snippet="\n\n".join(bubble_texts) if bubble_texts else None,
+ )
+ if is_excluded_by_rules(rules, searchable):
continue
- if not text.strip() and has_tool:
- text = f"**Tool: {b['toolFormerData'].get('name', 'unknown')}**"
-
- btype = "user" if h.get("type") == 1 else "assistant"
- bubble_ts = to_epoch_ms(b.get("createdAt")) or to_epoch_ms(b.get("timestamp")) or 0
-
- thinking = None
- thinking_duration_ms = None
- if b.get("thinking"):
- thinking = b["thinking"] if isinstance(b["thinking"], str) else (
- b["thinking"].get("text") if isinstance(b["thinking"], dict) else None
- )
- thinking_duration_ms = b.get("thinkingDurationMs")
-
- # Full tool call parsing with input/output
- tool_info = None
- if has_tool:
- tool_info = parse_tool_call(b["toolFormerData"])
-
- # Per-bubble model info
- model_info = (b.get("modelInfo") or {}).get("modelName")
- if model_info == "default":
- model_info = None
-
- # Context window from user bubbles
- ctx_window = b.get("contextWindowStatusAtCreation") or {}
- ctx_tokens_used = ctx_window.get("tokensUsed", 0)
- ctx_token_limit = ctx_window.get("tokenLimit", 0)
- ctx_pct_remaining = ctx_window.get("percentageRemainingFloat") or ctx_window.get("percentageRemaining")
-
- # Token counts (AI bubbles only)
- tc_dict = (b.get("tokenCount") or {}) if btype == "assistant" else {}
- in_tok = tc_dict.get("inputTokens") or 0
- out_tok = tc_dict.get("outputTokens") or 0
- cached_tok = tc_dict.get("cachedTokens") or 0
-
- bubbles.append({
- "type": btype,
- "text": text,
- "timestamp": bubble_ts,
- "thinking": thinking,
- "thinkingDurationMs": thinking_duration_ms,
- "tool": tool_info,
- "model": model_info,
- "contextTokensUsed": ctx_tokens_used if ctx_tokens_used > 0 else None,
- "contextTokenLimit": ctx_token_limit if ctx_token_limit > 0 else None,
- "contextPctRemaining": round(ctx_pct_remaining, 1) if ctx_pct_remaining else None,
- "inputTokens": in_tok if in_tok > 0 else None,
- "outputTokens": out_tok if out_tok > 0 else None,
- "cachedTokens": cached_tok if cached_tok > 0 else None,
- })
-
- bubbles.sort(key=lambda x: x["timestamp"] or 0)
-
- # Compute response times
- last_user_ts = None
- for b_item in bubbles:
- if b_item["type"] == "user":
- last_user_ts = b_item.get("timestamp")
- elif b_item["type"] == "assistant" and last_user_ts:
- bts = b_item.get("timestamp")
- if bts and bts > last_user_ts:
- b_item["responseTimeMs"] = bts - last_user_ts
-
- # Aggregated metrics
- total_response_ms = sum(b_item.get("responseTimeMs", 0) for b_item in bubbles)
- total_thinking_ms = sum(b_item.get("thinkingDurationMs", 0) or 0 for b_item in bubbles)
- total_tool_calls = sum(1 for b_item in bubbles if b_item.get("tool"))
- lines_added = cd.get("totalLinesAdded", 0)
- lines_removed = cd.get("totalLinesRemoved", 0)
- files_added = cd.get("addedFiles", 0)
- files_removed = cd.get("removedFiles", 0)
- max_ctx_used = max((b_item.get("contextTokensUsed", 0) or 0) for b_item in bubbles) if bubbles else 0
- ctx_limit = max((b_item.get("contextTokenLimit", 0) or 0) for b_item in bubbles) if bubbles else 0
- total_input_tokens = sum(b_item.get("inputTokens") or 0 for b_item in bubbles)
- total_output_tokens = sum(b_item.get("outputTokens") or 0 for b_item in bubbles)
- total_cached_tokens = sum(b_item.get("cachedTokens") or 0 for b_item in bubbles)
- usage_data = cd.get("usageData") or {}
- total_cost_raw = usage_data.get("cost") or usage_data.get("estimatedCost")
- total_cost = total_cost_raw if isinstance(total_cost_raw, (int, float)) and total_cost_raw > 0 else None
-
- # Build frontmatter
- created_ms = to_epoch_ms(cd.get("createdAt")) or ts_ms
- md = "---\n"
- md += f"log_id: {composer_id}\n"
- md += f"title: {title}\n"
- md += f"created_at: {datetime.fromtimestamp(created_ms / 1000).isoformat()}\n"
- md += f"updated_at: {datetime.fromtimestamp(updated_at_ms / 1000).isoformat() if updated_at_ms else datetime.now().isoformat()}\n"
- md += f"workspace: {ws_slug}\n"
- md += f"workspace_name: {ws_display_name}\n"
- md += f"message_count: {len(bubbles)}\n"
- if model_name:
- md += f"model: {model_name}\n"
- if total_input_tokens:
- md += f"total_input_tokens: {total_input_tokens}\n"
- if total_output_tokens:
- md += f"total_output_tokens: {total_output_tokens}\n"
- if total_cached_tokens:
- md += f"total_cached_tokens: {total_cached_tokens}\n"
- if total_cost:
- md += f"total_cost_usd: {total_cost:.6f}\n"
- if total_response_ms:
- md += f"total_response_time_sec: {total_response_ms / 1000:.1f}\n"
- if total_thinking_ms:
- md += f"total_thinking_time_sec: {total_thinking_ms / 1000:.1f}\n"
- if total_tool_calls:
- md += f"total_tool_calls: {total_tool_calls}\n"
- if max_ctx_used and ctx_limit:
- md += f"max_context_tokens_used: {max_ctx_used}\n"
- md += f"context_token_limit: {ctx_limit}\n"
- if lines_added or lines_removed:
- md += f"lines_added: {lines_added}\n"
- md += f"lines_removed: {lines_removed}\n"
- if files_added or files_removed:
- md += f"files_added: {files_added}\n"
- md += f"files_removed: {files_removed}\n"
- md += "---\n\n"
- md += f"# {title}\n\n"
- md += f"_Created: {datetime.fromtimestamp(created_ms / 1000).strftime('%Y-%m-%d %H:%M:%S')}_\n\n"
- md += "---\n\n"
-
- for bubble in bubbles:
- role_label = "User" if bubble["type"] == "user" else "Assistant"
- md += f"### {role_label}\n\n"
- # Bubble metadata line
- meta_parts = []
- if bubble.get("model"):
- meta_parts.append(f"Model: {bubble['model']}")
- if bubble.get("inputTokens") or bubble.get("outputTokens"):
- tok_parts = []
- if bubble.get("inputTokens"):
- tok_parts.append(f"In: {bubble['inputTokens']:,}")
- if bubble.get("outputTokens"):
- tok_parts.append(f"Out: {bubble['outputTokens']:,}")
- if bubble.get("cachedTokens"):
- tok_parts.append(f"Cached: {bubble['cachedTokens']:,}")
- meta_parts.append(" / ".join(tok_parts))
- if bubble.get("responseTimeMs"):
- meta_parts.append(f"Response: {bubble['responseTimeMs'] / 1000:.1f}s")
- if bubble.get("thinkingDurationMs"):
- meta_parts.append(f"Thinking: {bubble['thinkingDurationMs'] / 1000:.1f}s")
- if bubble.get("contextTokensUsed") and bubble.get("contextTokenLimit"):
- pct = bubble["contextTokensUsed"] / bubble["contextTokenLimit"] * 100
- meta_parts.append(f"Context: {bubble['contextTokensUsed']:,} / {bubble['contextTokenLimit']:,} tokens ({pct:.0f}% used)")
- elif bubble.get("contextPctRemaining") is not None:
- meta_parts.append(f"Context: {bubble['contextPctRemaining']}% remaining")
- if meta_parts:
- md += f"_{' | '.join(meta_parts)}_\n\n"
- if bubble["timestamp"]:
- md += f"_{datetime.fromtimestamp(bubble['timestamp'] / 1000).strftime('%Y-%m-%d %H:%M:%S')}_\n\n"
- if bubble.get("thinking"):
- dur_str = f" ({bubble['thinkingDurationMs'] / 1000:.1f}s)" if bubble.get("thinkingDurationMs") else ""
- md += f"Thinking{dur_str}
\n\n{bubble['thinking']}\n\n \n\n"
- md += bubble["text"] + "\n\n"
- # Full tool call with input/output
- if bubble.get("tool"):
- t = bubble["tool"]
- tool_name = t.get("name") or "unknown"
- tool_status = t.get("status") or ""
- tool_summary = t.get("summary") or tool_name
- status_str = f" ({tool_status})" if tool_status else ""
- md += f"> **Tool: {tool_summary}**{status_str}\n"
- if t.get("input"):
- md += ">\n> **INPUT:**\n> ```\n"
- for iline in str(t["input"]).split("\n"):
- md += f"> {iline}\n"
- md += "> ```\n"
- if t.get("output"):
- md += ">\n> **OUTPUT:**\n> ```\n"
- for oline in str(t["output"]).split("\n"):
- md += f"> {oline}\n"
- md += "> ```\n"
- md += "\n"
- md += "---\n\n"
-
- exported.append({"path": rel_path, "content": md, "updatedAt": updated_at_ms})
-
- except Exception as e:
- print(f"Error processing composer {composer_id} for export: {e}")
+
+ title_slug = slug(title)
+ ts_ms = updated_at_ms or int(datetime.now().timestamp() * 1000)
+ ts_str = datetime.fromtimestamp(ts_ms / 1000).strftime("%Y-%m-%dT%H-%M-%S")
+ filename = f"{ts_str}__{title_slug}__{composer_id[:8]}.md"
+ rel_path = os.path.join(today, ws_slug, "chat", filename)
+
+ md = cursor_ide_chat_to_markdown(
+ composer_data=cd,
+ composer_id=composer_id,
+ bubble_map=bubble_map,
+ code_block_diff_map=code_block_diff_map,
+ workspace_info={"ws_slug": ws_slug, "ws_display_name": ws_display_name},
+ )
+ exported.append({"path": rel_path, "content": md, "updatedAt": updated_at_ms})
+
+ except Exception as e:
+ print(f"Error processing composer {composer_id} for export: {e}")
count = len(exported)
if count == 0:
@@ -412,15 +189,12 @@ def export_chats():
" since last export" if since == "last" else ""
)}), 404
- # Build zip in memory
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
for entry in exported:
zf.writestr(entry["path"], entry["content"])
buf.seek(0)
-
- # Save export state
_save_export_state(count)
filename = "cursor-export.zip"
@@ -438,8 +212,3 @@ def export_chats():
import traceback
traceback.print_exc()
return jsonify({"error": f"Export failed: {str(e)}"}), 500
- finally:
- # Guaranteed close — fires on success, exception, AND on any
- # in-body return that doesn't go through except (issue #17).
- if conn is not None:
- conn.close()
diff --git a/api/workspaces.py b/api/workspaces.py
index 64318d2..e778993 100644
--- a/api/workspaces.py
+++ b/api/workspaces.py
@@ -16,7 +16,7 @@
from utils.workspace_path import resolve_workspace_path, get_cli_chats_path
from utils.cli_chat_reader import list_cli_projects
from utils.path_helpers import get_workspace_folder_paths, get_workspace_display_name
-from utils.workspace_descriptor import _read_json_file
+from utils.workspace_descriptor import read_json_file
from services.workspace_resolver import (
_infer_workspace_name_from_context,
# Re-exported for back-compat with existing tests that import from api.workspaces
@@ -107,7 +107,7 @@ def get_workspace(workspace_id):
folder = None
workspace_name = workspace_id
try:
- wd = _read_json_file(wj_path)
+ wd = read_json_file(wj_path)
folder_paths = get_workspace_folder_paths(wd)
folder = folder_paths[0] if folder_paths else wd.get("folder")
derived_name = get_workspace_display_name(wd)
diff --git a/scripts/export.py b/scripts/export.py
index 46cc1bd..f4f93a2 100644
--- a/scripts/export.py
+++ b/scripts/export.py
@@ -4,46 +4,72 @@
Usage: python scripts/export.py [--since all|last] [--out DIR] [--no-zip] [--no-composer]
Run with --help for full usage information.
Env: WORKSPACE_PATH for Cursor workspaceStorage path.
+
+When the package is installed via ``pip install -e .`` (or ``pip install .``),
+this module is importable as ``scripts.export`` without any sys.path hacks.
+The guard below is only necessary for direct invocation (``python scripts/export.py``).
"""
import json
import logging
import os
-import re
import sqlite3
import sys
import zipfile
from datetime import datetime
from pathlib import Path
-from urllib.parse import unquote as _url_unquote
-# Ensure project root is on path when run as python scripts/export.py
-_project_root = Path(__file__).resolve().parent.parent
-if str(_project_root) not in sys.path:
- sys.path.insert(0, str(_project_root))
+# sys.path guard: only needed when the script is invoked directly
+# (``python scripts/export.py``). When installed via the pyproject.toml
+# entry point (``cursor-chat-export``) or imported as a module, the
+# project root is already on sys.path.
+if __name__ == "__main__":
+ _project_root = Path(__file__).resolve().parent.parent
+ if str(_project_root) not in sys.path:
+ sys.path.insert(0, str(_project_root))
-# noqa: E402 — these imports must come after the sys.path.insert above so the
-# script can be run directly as `python scripts/export.py` from anywhere.
from utils.exclusion_rules import ( # noqa: E402
resolve_exclusion_rules_path,
load_rules,
build_searchable_text,
is_excluded_by_rules,
)
-from utils.path_helpers import ( # noqa: E402
- get_workspace_folder_paths as _shared_get_workspace_folder_paths,
- normalize_file_path,
- to_epoch_ms,
+from utils.path_helpers import to_epoch_ms # noqa: E402
+from utils.text_extract import ( # noqa: E402
+ extract_text_from_bubble,
+ slug,
)
from utils.tool_parser import parse_tool_call # noqa: E402
-from utils.workspace_path import get_cli_chats_path # noqa: E402
+from utils.workspace_path import ( # noqa: E402
+ get_cli_chats_path,
+ resolve_workspace_path,
+)
from utils.cli_chat_reader import ( # noqa: E402
list_cli_projects,
traverse_blobs,
messages_to_bubbles,
)
-from utils.cursor_md_exporter import cursor_cli_session_to_markdown # noqa: E402
+from utils.cursor_md_exporter import ( # noqa: E402
+ cursor_cli_session_to_markdown,
+ cursor_ide_chat_to_markdown,
+)
from models import ExportEntry, SchemaError # noqa: E402
+from services.workspace_db import ( # noqa: E402
+ _build_composer_id_to_workspace_id,
+ _collect_invalid_workspace_ids,
+ _collect_workspace_entries,
+ load_bubble_map,
+ load_code_block_diff_map,
+ load_project_layouts_map,
+ _open_global_db,
+)
+from services.workspace_resolver import ( # noqa: E402
+ _determine_project_for_conversation,
+ _get_workspace_display_name,
+ _infer_invalid_workspace_aliases,
+ _create_project_name_to_workspace_id_map,
+ _create_workspace_path_to_id_map,
+)
_logger = logging.getLogger(__name__)
@@ -87,53 +113,6 @@ def _write_manifest_entries(manifest_path: str, entries_by_id: dict):
f.write(json.dumps(entry) + "\n")
-def get_default_workspace_path() -> str:
- home = str(Path.home())
- release = ""
- try:
- release = os.uname().release.lower()
- except AttributeError:
- pass
- is_wsl = "microsoft" in release or "wsl" in release
- is_remote = bool(
- os.environ.get("SSH_CONNECTION")
- or os.environ.get("SSH_CLIENT")
- or os.environ.get("SSH_TTY")
- )
-
- if is_wsl:
- import subprocess
- username = os.getenv("USER", "")
- try:
- username = subprocess.check_output(
- ["cmd.exe", "/c", "echo", "%USERNAME%"],
- text=True,
- stderr=subprocess.DEVNULL,
- ).strip()
- except Exception:
- pass
- return f"/mnt/c/Users/{username}/AppData/Roaming/Cursor/User/workspaceStorage"
-
- if sys.platform == "win32":
- return os.path.join(home, "AppData", "Roaming", "Cursor", "User", "workspaceStorage")
- elif sys.platform == "darwin":
- return os.path.join(home, "Library", "Application Support", "Cursor", "User", "workspaceStorage")
- elif sys.platform == "linux":
- if is_remote:
- return os.path.join(home, ".cursor-server", "data", "User", "workspaceStorage")
- return os.path.join(home, ".config", "Cursor", "User", "workspaceStorage")
- return os.path.join(home, "workspaceStorage")
-
-
-def resolve_workspace_path() -> str:
- env = os.environ.get("WORKSPACE_PATH", "").strip()
- if env:
- if env.startswith("~/"):
- return os.path.join(str(Path.home()), env[2:])
- return env
- return get_default_workspace_path()
-
-
def get_global_state_dir() -> str:
# Honor XDG_STATE_HOME when set so the export state file (and manifest)
# can be redirected — required for hermetic test runs and useful for
@@ -145,55 +124,6 @@ def get_global_state_dir() -> str:
return os.path.join(str(Path.home()), ".cursor-chat-browser")
-def slug(s: str) -> str:
- s = re.sub(r'[<>:"/\\|?*]', "_", s or "")
- s = re.sub(r"\s+", "-", s)
- s = re.sub(r"-+", "-", s)
- s = s.strip("-")
- return s[:80] or "untitled"
-
-
-def extract_text_from_rich_text(children) -> str:
- if not isinstance(children, list):
- return ""
- t = ""
- for c in children:
- if not isinstance(c, dict):
- continue
- if c.get("type") == "text" and c.get("text"):
- t += c["text"]
- elif c.get("type") == "code" and c.get("children"):
- t += "\n```\n" + extract_text_from_rich_text(c["children"]) + "\n```\n"
- elif c.get("children"):
- t += extract_text_from_rich_text(c["children"])
- return t
-
-
-def extract_text_from_bubble(bubble) -> str:
- if not bubble or not isinstance(bubble, dict):
- return ""
- t = ""
- if bubble.get("text") and str(bubble["text"]).strip():
- t = bubble["text"]
- if not t and bubble.get("richText"):
- try:
- r = json.loads(bubble["richText"]) if isinstance(bubble["richText"], str) else bubble["richText"]
- if isinstance(r, dict) and r.get("root") and r["root"].get("children"):
- t = extract_text_from_rich_text(r["root"]["children"])
- except Exception:
- pass
- cbs = bubble.get("codeBlocks")
- if isinstance(cbs, list):
- for cb in cbs:
- if isinstance(cb, dict) and cb.get("content"):
- t += f"\n\n```{cb.get('language', '')}\n{cb['content']}\n```"
- return t
-
-
-def get_workspace_folder_paths(wd) -> list:
- return _shared_get_workspace_folder_paths(wd)
-
-
def parse_args():
import argparse
parser = argparse.ArgumentParser(
@@ -243,14 +173,13 @@ def main():
if opts.get("base_dir"):
os.environ["WORKSPACE_PATH"] = opts["base_dir"]
workspace_path = resolve_workspace_path()
- global_path = os.path.normpath(os.path.join(workspace_path, "..", "globalStorage", "state.vscdb"))
state_dir = get_global_state_dir()
state_path = os.path.join(state_dir, "export_state.json")
last_export = 0
if since == "last" and os.path.isfile(state_path):
try:
- with open(state_path, "r") as f:
+ with open(state_path, "r", encoding="utf-8") as f:
st = json.load(f)
ts = st.get("lastExportTime")
if ts:
@@ -258,209 +187,68 @@ def main():
except Exception:
pass
- # Pre-initialize IDE data — populated below only if the IDE database is accessible.
- workspace_entries: list = []
- workspace_path_to_id: dict = {}
- project_name_to_ws: dict = {}
- workspace_id_to_slug: dict = {}
+ # ── Workspace scanning via service layer ──────────────────────────────────
+ workspace_entries = _collect_workspace_entries(workspace_path)
+ invalid_workspace_ids = _collect_invalid_workspace_ids(workspace_entries)
+ project_name_map = _create_project_name_to_workspace_id_map(workspace_entries)
+ workspace_path_map = _create_workspace_path_to_id_map(workspace_entries)
+ composer_id_to_ws = _build_composer_id_to_workspace_id(workspace_path, workspace_entries)
+
+ # Build display-name and slug maps from workspace entries.
+ # Entries whose workspace.json cannot be resolved are omitted so the
+ # usage-site fallback (slug(ws_id[:12])) applies — matching original
+ # behaviour where unresolvable workspaces were skipped.
workspace_id_to_display_name: dict[str, str] = {}
+ workspace_id_to_slug: dict[str, str] = {}
+ for e in workspace_entries:
+ display = _get_workspace_display_name(workspace_path, e["name"])
+ if display != e["name"]: # successfully resolved a human-readable name
+ workspace_id_to_display_name[e["name"]] = display
+ workspace_id_to_slug[e["name"]] = slug(display)
+
+ # ── Database reading via service layer ────────────────────────────────────
project_layouts_map: dict = {}
bubble_map: dict = {}
code_block_diff_map: dict = {}
ide_composer_rows: list = []
+ invalid_workspace_aliases: dict = {}
+
+ with _open_global_db(workspace_path) as (global_db, global_db_path):
+ if global_db is None:
+ print(
+ f"Note: Cursor IDE global storage not found at {global_db_path}"
+ " — skipping IDE chats.",
+ file=sys.stderr,
+ )
+ else:
+ project_layouts_map = load_project_layouts_map(global_db)
+ bubble_map = load_bubble_map(global_db)
+ code_block_diff_map = load_code_block_diff_map(global_db)
- # Load IDE chat data — skipped gracefully when the database is absent or locked.
- if not os.path.isfile(global_path):
- print(f"Note: Cursor IDE global storage not found at {global_path} — skipping IDE chats.", file=sys.stderr)
- else:
- _conn = None
- try:
- _conn = sqlite3.connect(f"file:{global_path}?mode=ro", uri=True)
- _conn.row_factory = sqlite3.Row
-
- # Build workspace entries
- try:
- for name in os.listdir(workspace_path):
- full = os.path.join(workspace_path, name)
- if os.path.isdir(full):
- wp = os.path.join(full, "workspace.json")
- if os.path.isfile(wp):
- workspace_entries.append({"name": name, "workspaceJsonPath": wp})
- except Exception:
- pass
-
- for e in workspace_entries:
- try:
- with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f:
- wd = json.load(f)
- folders = get_workspace_folder_paths(wd)
- first_folder = folders[0] if folders else None
- if isinstance(first_folder, str) and first_folder:
- fn = re.sub(r"^file://", "", first_folder).replace("\\", "/").split("/")[-1]
- if fn:
- workspace_id_to_slug[e["name"]] = slug(fn)
- workspace_id_to_display_name[e["name"]] = _url_unquote(fn)
- for folder in get_workspace_folder_paths(wd):
- norm = normalize_file_path(folder)
- workspace_path_to_id[norm] = e["name"]
- fn2 = re.sub(r"^file://", "", folder).replace("\\", "/").split("/")[-1]
- if fn2:
- project_name_to_ws[fn2] = e["name"]
- except Exception:
- pass
-
- # Project layouts
- try:
- for row in _conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'messageRequestContext:%'"):
- parts = row["key"].split(":")
- if len(parts) < 2:
- continue
- cid = parts[1]
- try:
- ctx = json.loads(row["value"])
- layouts = ctx.get("projectLayouts")
- if isinstance(layouts, list):
- project_layouts_map.setdefault(cid, [])
- for layout in layouts:
- try:
- o = json.loads(layout) if isinstance(layout, str) else layout
- if isinstance(o, dict) and o.get("rootPath"):
- project_layouts_map[cid].append(o["rootPath"])
- except Exception:
- pass
- except Exception:
- pass
- except Exception:
- pass
-
- # Bubble map
- try:
- for row in _conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"):
- parts = row["key"].split(":")
- if len(parts) >= 3:
- bid = parts[2]
- try:
- b = json.loads(row["value"])
- if isinstance(b, dict):
- bubble_map[bid] = b
- except Exception:
- pass
- except Exception:
- pass
-
- # Code block diffs
try:
- for row in _conn.execute("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'codeBlockDiff:%'"):
- parts = row["key"].split(":")
- cid = parts[1] if len(parts) > 1 else None
- if not cid:
- continue
- try:
- d = json.loads(row["value"])
- code_block_diff_map.setdefault(cid, []).append({
- **d,
- "diffId": parts[2] if len(parts) > 2 else None,
- })
- except Exception:
- pass
- except Exception:
+ ide_composer_rows = global_db.execute(
+ "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%'"
+ " AND value LIKE '%fullConversationHeadersOnly%'"
+ ).fetchall()
+ except sqlite3.Error:
pass
- ide_composer_rows = _conn.execute(
- "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%'"
- " AND value LIKE '%fullConversationHeadersOnly%'"
- ).fetchall()
- except Exception as e:
- print(f"Warning: Could not read Cursor IDE chats ({e}) — skipping.", file=sys.stderr)
- finally:
- # Guaranteed close on every exit path (issue #17). Replaces the
- # previous duplicate close-in-success-and-error pattern.
- if _conn is not None:
- try:
- _conn.close()
- except Exception:
- pass
-
- def get_project_from_file_path(fp):
- np = normalize_file_path(fp)
- best = None
- best_len = 0
- for e in workspace_entries:
- try:
- with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f:
- wd = json.load(f)
- for folder in get_workspace_folder_paths(wd):
- wp = normalize_file_path(folder)
- if np.startswith(wp) and len(wp) > best_len:
- best_len = len(wp)
- best = e["name"]
- except Exception:
- pass
- return best
-
- def assign_workspace(cd, cid):
- # Try project layouts
- pl = project_layouts_map.get(cid, [])
- best_layout = None
- best_len = 0
- for rp in pl:
- match = get_project_from_file_path(rp)
- if match:
- nl = len(normalize_file_path(rp))
- if nl > best_len:
- best_len = nl
- best_layout = match
- if best_layout:
- return best_layout
-
- # Try file paths
- paths = []
- for fi in (cd.get("newlyCreatedFiles") or []):
- if isinstance(fi, dict) and fi.get("uri") and fi["uri"].get("path"):
- paths.append(normalize_file_path(fi["uri"]["path"]))
- for fp in (cd.get("codeBlockData") or {}).keys():
- paths.append(normalize_file_path(re.sub(r"^file://", "", fp)))
- for h in (cd.get("fullConversationHeadersOnly") or []):
- b = bubble_map.get(h.get("bubbleId"))
- if not b:
- continue
- for fp in (b.get("relevantFiles") or []):
- if fp:
- paths.append(normalize_file_path(fp))
- for u in (b.get("attachedFileCodeChunksUris") or []):
- if isinstance(u, dict) and u.get("path"):
- paths.append(normalize_file_path(u["path"]))
- for fs_entry in (b.get("context", {}).get("fileSelections") or []):
- if isinstance(fs_entry, dict) and isinstance(fs_entry.get("uri"), dict) and fs_entry["uri"].get("path"):
- paths.append(normalize_file_path(fs_entry["uri"]["path"]))
-
- sep = "\\" if sys.platform == "win32" else "/"
- best_id = None
- best_l = 0
- for p in paths:
- for e in workspace_entries:
- try:
- with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f:
- wd = json.load(f)
- for folder in get_workspace_folder_paths(wd):
- fn = re.sub(r"^file://", "", folder).replace("\\", "/").split("/")[-1]
- if not fn:
- continue
- needle = sep + fn + sep
- needle_end = sep + fn
- if needle in p or p.endswith(needle_end):
- if len(fn) > best_l:
- best_l = len(fn)
- best_id = e["name"]
- except Exception:
- pass
- return best_id or "global"
+ invalid_workspace_aliases = _infer_invalid_workspace_aliases(
+ composer_rows=ide_composer_rows,
+ project_layouts_map=project_layouts_map,
+ project_name_map=project_name_map,
+ workspace_path_map=workspace_path_map,
+ workspace_entries=workspace_entries,
+ bubble_map=bubble_map,
+ composer_id_to_ws=composer_id_to_ws,
+ invalid_workspace_ids=invalid_workspace_ids,
+ )
today = datetime.now().strftime("%Y-%m-%d")
exported = []
count = 0
- # Process IDE composers (skipped entirely when --no-composer was passed)
+ # ── Process IDE composers ────────────────────────────────────────────────
include_composer = opts.get("include_composer", True)
for row in ide_composer_rows if include_composer else []:
composer_id = row["key"].split(":")[1]
@@ -477,7 +265,17 @@ def assign_workspace(cd, cid):
if since == "last" and updated_at <= last_export:
continue
- ws_id = assign_workspace(cd, composer_id)
+ # Workspace assignment via service layer
+ pid = _determine_project_for_conversation(
+ cd, composer_id, project_layouts_map,
+ project_name_map, workspace_path_map,
+ workspace_entries, bubble_map, composer_id_to_ws, invalid_workspace_ids,
+ )
+ mapped_ws = composer_id_to_ws.get(composer_id)
+ if not pid and mapped_ws in invalid_workspace_ids:
+ pid = invalid_workspace_aliases.get(mapped_ws)
+ ws_id = pid if pid else "global"
+
ws_slug = "other-chats" if ws_id == "global" else (workspace_id_to_slug.get(ws_id) or slug(ws_id[:12]))
ws_display_name = "Other chats" if ws_id == "global" else (workspace_id_to_display_name.get(ws_id) or ws_slug)
title = cd.get("name") or f"Chat {composer_id[:8]}"
@@ -510,294 +308,42 @@ def assign_workspace(cd, cid):
bubble_texts
+ bubble_meta_parts
+ code_diff_parts
- + [
- _json_dump_safe(model_config),
- _json_dump_safe(cd),
- ]
+ + [_json_dump_safe(model_config), _json_dump_safe(cd)]
)
if p
),
)
if is_excluded_by_rules(exclusion_rules, searchable):
continue
+
title_slug = slug(title)
ts = updated_at or int(datetime.now().timestamp() * 1000)
ts_str = datetime.fromtimestamp(ts / 1000).strftime("%Y-%m-%dT%H-%M-%S")
filename = f"{ts_str}__{title_slug}__{composer_id[:8]}.md"
- rel_dir = os.path.join(today, ws_slug, "chat")
- out_path = os.path.join(out_dir, rel_dir, filename)
-
- # Build bubbles with full metadata
- bubbles = []
- for h in headers:
- b = bubble_map.get(h.get("bubbleId"))
- if not b:
- continue
- text = extract_text_from_bubble(b)
- has_tool = isinstance(b.get("toolFormerData"), dict)
- has_thinking = bool(b.get("thinking"))
- if not text.strip() and not has_tool and not has_thinking:
- continue
- if not text.strip() and has_tool:
- text = f"**Tool: {b['toolFormerData'].get('name', 'unknown')}**"
-
- btype = "user" if h.get("type") == 1 else "ai"
-
- thinking = None
- thinking_duration_ms = None
- if b.get("thinking"):
- thinking = b["thinking"] if isinstance(b["thinking"], str) else (
- b["thinking"].get("text") if isinstance(b["thinking"], dict) else None
- )
- thinking_duration_ms = b.get("thinkingDurationMs")
-
- tool_info = None
- if has_tool:
- tool_info = parse_tool_call(b["toolFormerData"])
-
- model_info = (b.get("modelInfo") or {}).get("modelName")
- if model_info == "default":
- model_info = None
-
- ctx_window = b.get("contextWindowStatusAtCreation") or {}
- ctx_tokens_used = ctx_window.get("tokensUsed", 0)
- ctx_token_limit = ctx_window.get("tokenLimit", 0)
- ctx_pct_remaining = ctx_window.get("percentageRemainingFloat") or ctx_window.get("percentageRemaining")
-
- bubbles.append({
- "type": btype,
- "text": text,
- "timestamp": to_epoch_ms(b.get("createdAt")) or to_epoch_ms(b.get("timestamp")) or int(datetime.now().timestamp() * 1000),
- "tool": tool_info,
- "thinking": thinking,
- "thinkingDurationMs": thinking_duration_ms,
- "model": model_info,
- "contextTokensUsed": ctx_tokens_used if ctx_tokens_used > 0 else None,
- "contextTokenLimit": ctx_token_limit if ctx_token_limit > 0 else None,
- "contextPctRemaining": round(ctx_pct_remaining, 1) if ctx_pct_remaining else None,
- })
-
- # Code block diffs
- for d in code_block_diff_map.get(composer_id, []):
- bubbles.append({
- "type": "ai",
- "text": f"**Code edit:** {json.dumps(d)}",
- "timestamp": to_epoch_ms(cd.get("lastUpdatedAt")) or to_epoch_ms(cd.get("createdAt")) or int(datetime.now().timestamp() * 1000),
- })
-
- bubbles.sort(key=lambda bub: bub.get("timestamp") or 0)
-
- # Compute per-assistant-bubble response times
- last_user_ts = None
- for bub in bubbles:
- if bub["type"] == "user":
- last_user_ts = bub.get("timestamp")
- elif bub["type"] == "ai" and last_user_ts:
- bts = bub.get("timestamp")
- if bts and bts > last_user_ts:
- bub["responseTimeMs"] = bts - last_user_ts
-
- # Session-level aggregates
- total_response_ms = sum(bub.get("responseTimeMs", 0) for bub in bubbles)
- total_thinking_ms = sum(bub.get("thinkingDurationMs", 0) or 0 for bub in bubbles)
- total_tool_calls = sum(1 for bub in bubbles if bub.get("tool"))
- max_ctx_used = max((bub.get("contextTokensUsed") or 0) for bub in bubbles) if bubbles else 0
- ctx_limit = max((bub.get("contextTokenLimit") or 0) for bub in bubbles) if bubbles else 0
-
- tool_breakdown = {}
- for bub in bubbles:
- if bub.get("tool"):
- tn = bub["tool"].get("name", "unknown")
- tool_breakdown[tn] = tool_breakdown.get(tn, 0) + 1
-
- lines_added = cd.get("totalLinesAdded", 0)
- lines_removed = cd.get("totalLinesRemoved", 0)
-
- # Wall-clock duration from bubble timestamps
- ts_vals = [bub["timestamp"] for bub in bubbles if bub.get("timestamp")]
- wall_clock_sec = int((max(ts_vals) - min(ts_vals)) / 1000) if len(ts_vals) >= 2 else None
-
- # Collect file/command activity and tool result stats from tool calls
- files_read_list = []
- files_written_list = []
- commands_run_list = []
- tool_result_stats = {
- "terminal_success": 0, "terminal_error": 0,
- "file_reads": 0, "file_edits": 0,
- "searches": 0, "web": 0,
- }
- for bub in bubbles:
- if not bub.get("tool"):
- continue
- t = bub["tool"]
- tn = t.get("name", "")
- status = t.get("status") or ""
- raw_input = str(t.get("input") or "").strip()
- first_line = raw_input.split("\n")[0] if raw_input else ""
- if tn == "read_file_v2" and first_line:
- files_read_list.append(first_line)
- tool_result_stats["file_reads"] += 1
- elif tn == "edit_file_v2" and first_line:
- files_written_list.append(first_line)
- tool_result_stats["file_edits"] += 1
- elif tn == "run_terminal_command_v2" and raw_input:
- commands_run_list.append(raw_input)
- if status == "completed":
- tool_result_stats["terminal_success"] += 1
- elif status in ("error", "failed"):
- tool_result_stats["terminal_error"] += 1
- else:
- tool_result_stats["terminal_success"] += 1
- elif tn in ("ripgrep_raw_search", "glob_file_search", "semantic_search_full"):
- tool_result_stats["searches"] += 1
- elif tn in ("web_search", "web_fetch"):
- tool_result_stats["web"] += 1
-
- # Frontmatter
- created_ms = to_epoch_ms(cd.get("createdAt")) or ts
- fm_lines = ["---"]
- fm_lines.append(f"log_id: {composer_id}")
- fm_lines.append("log_type: chat")
- fm_lines.append(f'title: "{title.replace(chr(34), chr(92)+chr(34))}"')
- fm_lines.append(f"created_at: {datetime.fromtimestamp(created_ms / 1000).isoformat()}")
- fm_lines.append(f"updated_at: {datetime.fromtimestamp(updated_at / 1000).isoformat() if updated_at else datetime.now().isoformat()}")
- fm_lines.append(f"workspace: {ws_slug}")
- fm_lines.append(f'workspace_name: "{ws_display_name}"')
- if model_name and model_name != "default":
- fm_lines.append(f"model: {model_name}")
- fm_lines.append(f"message_count: {len(bubbles)}")
- if total_tool_calls:
- fm_lines.append(f"total_tool_calls: {total_tool_calls}")
- if tool_breakdown:
- fm_lines.append("tool_call_breakdown:")
- for tn, cnt in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
- fm_lines.append(f" {tn}: {cnt}")
- total_think = sum(1 for bub in bubbles if bub.get("thinking"))
- if total_think:
- fm_lines.append(f"thinking_count: {total_think}")
- if wall_clock_sec is not None:
- fm_lines.append(f"wall_clock_seconds: {wall_clock_sec}")
- if total_response_ms:
- fm_lines.append(f"total_response_time_sec: {total_response_ms / 1000:.1f}")
- if total_thinking_ms:
- fm_lines.append(f"total_thinking_time_sec: {total_thinking_ms / 1000:.1f}")
- if max_ctx_used and ctx_limit:
- fm_lines.append(f"max_context_tokens_used: {max_ctx_used}")
- fm_lines.append(f"context_token_limit: {ctx_limit}")
- if lines_added or lines_removed:
- fm_lines.append(f"lines_added: {lines_added}")
- fm_lines.append(f"lines_removed: {lines_removed}")
- if files_read_list or files_written_list:
- fm_lines.append(f"files_read: {len(files_read_list)}")
- fm_lines.append(f"files_written: {len(files_written_list)}")
- if commands_run_list:
- fm_lines.append(f"commands_run: {len(commands_run_list)}")
- fm_lines.append("---")
- fm_str = "\n".join(fm_lines) + "\n\n"
-
- # Header
- header = f"# {title}\n\n"
- meta_parts = []
- if created_ms:
- meta_parts.append(f"Created: {datetime.fromtimestamp(created_ms / 1000).strftime('%Y-%m-%d %H:%M:%S')}")
- if model_name and model_name != "default":
- meta_parts.append(f"Model: {model_name}")
- if total_tool_calls:
- meta_parts.append(f"Tool calls: {total_tool_calls}")
- if wall_clock_sec is not None:
- hrs, rem = divmod(wall_clock_sec, 3600)
- mins, secs = divmod(rem, 60)
- dur = f"{hrs}h {mins}m" if hrs else (f"{mins}m {secs}s" if mins else f"{secs}s")
- meta_parts.append(f"Duration: {dur}")
- header += f"_{' | '.join(meta_parts)}_\n\n---\n\n" if meta_parts else "---\n\n"
-
- # Session summary block
- summary = ""
- if files_read_list or files_written_list or commands_run_list:
- summary += "## Session Summary\n\n"
- if files_written_list or files_read_list:
- summary += "### Files Touched\n\n"
- summary += "| Action | File |\n|--------|------|\n"
- for fp in files_written_list:
- summary += f"| Edit | `{fp}` |\n"
- for fp in files_read_list:
- summary += f"| Read | `{fp}` |\n"
- summary += "\n"
- if commands_run_list:
- summary += "### Commands Run\n\n"
- for i, cmd in enumerate(commands_run_list, 1):
- summary += f"{i}. `{cmd}`\n"
- summary += "\n"
- non_zero = {k: v for k, v in tool_result_stats.items() if v > 0}
- if non_zero:
- summary += "### Tool Results\n\n"
- labels = {
- "terminal_success": "Terminal Success",
- "terminal_error": "Terminal Error",
- "file_reads": "File Reads",
- "file_edits": "File Edits",
- "searches": "Searches",
- "web": "Web Fetches",
- }
- for k, v in non_zero.items():
- summary += f"- {labels.get(k, k)}: {v}\n"
- summary += "\n"
- summary += "---\n\n"
-
- # Body
- body = ""
- for bub in bubbles:
- role = "User" if bub["type"] == "user" else "Assistant"
- body += f"### {role}\n\n"
- # Per-message metadata line
- meta_parts = []
- if bub.get("model"):
- meta_parts.append(f"Model: {bub['model']}")
- if bub.get("responseTimeMs"):
- meta_parts.append(f"Response: {bub['responseTimeMs'] / 1000:.1f}s")
- if bub.get("thinkingDurationMs"):
- meta_parts.append(f"Thinking: {bub['thinkingDurationMs'] / 1000:.1f}s")
- if bub.get("contextTokensUsed") and bub.get("contextTokenLimit"):
- pct = bub["contextTokensUsed"] / bub["contextTokenLimit"] * 100
- meta_parts.append(f"Context: {bub['contextTokensUsed']:,} / {bub['contextTokenLimit']:,} tokens ({pct:.0f}% used)")
- elif bub.get("contextPctRemaining") is not None:
- meta_parts.append(f"Context: {bub['contextPctRemaining']}% remaining")
- if meta_parts:
- body += f"_{' | '.join(meta_parts)}_\n\n"
- if bub.get("timestamp"):
- body += f"_{datetime.fromtimestamp(bub['timestamp'] / 1000).isoformat()}_\n\n"
- if bub.get("thinking"):
- dur_str = f" ({bub['thinkingDurationMs'] / 1000:.1f}s)" if bub.get("thinkingDurationMs") else ""
- body += f"Thinking{dur_str}
\n\n{bub['thinking']}\n\n \n\n"
- body += bub["text"] + "\n\n"
- if bub.get("tool"):
- t = bub["tool"]
- tool_summary = t.get("summary") or t.get("name") or "unknown"
- tool_status = t.get("status") or ""
- status_str = f" ({tool_status})" if tool_status else ""
- body += f"> **Tool: {tool_summary}**{status_str}\n"
- if t.get("input"):
- body += "> **INPUT:**\n> ```\n"
- for iline in str(t["input"]).split("\n"):
- body += f"> {iline}\n"
- body += "> ```\n"
- if t.get("output"):
- body += "> **OUTPUT:**\n> ```\n"
- for oline in str(t["output"]).split("\n"):
- body += f"> {oline}\n"
- body += "> ```\n"
- body += "\n"
- body += "---\n\n"
-
- md = fm_str + header + summary + body
+ out_path = os.path.join(out_dir, today, ws_slug, "chat", filename)
+
+ # Markdown generation via shared exporter
+ md = cursor_ide_chat_to_markdown(
+ composer_data=cd,
+ composer_id=composer_id,
+ bubble_map=bubble_map,
+ code_block_diff_map=code_block_diff_map,
+ workspace_info={"ws_slug": ws_slug, "ws_display_name": ws_display_name},
+ )
rel_path = os.path.join(today, ws_slug, "chat", filename)
- exported.append({"id": composer_id, "rel_path": rel_path, "content": md,
- "out_path": out_path, "updatedAt": updated_at,
- "title": title, "workspace": ws_display_name})
+ exported.append({
+ "id": composer_id,
+ "rel_path": rel_path,
+ "content": md,
+ "out_path": out_path,
+ "updatedAt": updated_at,
+ "title": title,
+ "workspace": ws_display_name,
+ })
count += 1
- # --- Cursor CLI sessions ---
+ # ── Cursor CLI sessions ──────────────────────────────────────────────────
try:
cli_projects = list_cli_projects(get_cli_chats_path())
except Exception as e:
@@ -868,10 +414,8 @@ def assign_workspace(cd, cid):
title_slug = slug(title)
ts_str = datetime.fromtimestamp(created_ms / 1000).strftime("%Y-%m-%dT%H-%M-%S")
filename = f"{ts_str}__{title_slug}__{session_id[:8]}.md"
- rel_dir = os.path.join(today, ws_slug_cli, "cli")
- out_path = os.path.join(out_dir, rel_dir, filename)
+ out_path = os.path.join(out_dir, today, ws_slug_cli, "cli", filename)
- # Delegate Markdown generation to the shared exporter.
md = cursor_cli_session_to_markdown(
session["db_path"],
session_meta=meta,
@@ -904,7 +448,6 @@ def assign_workspace(cd, cid):
os.makedirs(out_dir, exist_ok=True)
if use_zip:
- # Archive all exported Markdown files into a single zip
zip_name = f"cursor-export-{today}.zip"
zip_path = os.path.join(out_dir, zip_name)
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
@@ -912,16 +455,13 @@ def assign_workspace(cd, cid):
zf.writestr(entry["rel_path"], entry["content"])
print(f"Exported {count} chat(s) to {zip_path}")
else:
- # Write individual Markdown files to disk
for entry in exported:
os.makedirs(os.path.dirname(entry["out_path"]), exist_ok=True)
with open(entry["out_path"], "w", encoding="utf-8") as f:
f.write(entry["content"])
- # Manifest in output directory
manifest_path = os.path.join(out_dir, "manifest.jsonl")
existing = _load_manifest_entries(manifest_path)
-
for entry in exported:
existing[entry["id"]] = {
"log_id": entry["id"],
@@ -930,11 +470,9 @@ def assign_workspace(cd, cid):
"path": os.path.relpath(entry["out_path"], out_dir),
"updated_at": datetime.fromtimestamp(entry["updatedAt"] / 1000).isoformat() if entry["updatedAt"] else datetime.now().isoformat(),
}
-
if existing:
_write_manifest_entries(manifest_path, existing)
- # Canonical manifest in user state dir so tracking survives changing --out paths
global_manifest_path = os.path.join(state_dir, "manifest.jsonl")
global_existing = _load_manifest_entries(global_manifest_path)
for entry in exported:
@@ -949,7 +487,6 @@ def assign_workspace(cd, cid):
_write_manifest_entries(global_manifest_path, global_existing)
print(f"Exported {count} chat(s) to {out_dir}")
- # Save state
state = {
"lastExportTime": datetime.now().isoformat(),
"exportedCount": count,
diff --git a/services/workspace_db.py b/services/workspace_db.py
index 16c7103..f4ffcac 100644
--- a/services/workspace_db.py
+++ b/services/workspace_db.py
@@ -1,13 +1,116 @@
from __future__ import annotations
import json
+import logging
import os
import sqlite3
from contextlib import closing, contextmanager
from pathlib import Path
+_logger = logging.getLogger(__name__)
+
from utils.path_helpers import get_workspace_folder_paths
-from utils.workspace_descriptor import _read_json_file
+from utils.workspace_descriptor import read_json_file
+
+
+# ── Global-DB KV loaders ────────────────────────────────────────────────────
+# Each function accepts an already-opened sqlite3.Connection (row_factory must
+# be set to sqlite3.Row by the caller, as _open_global_db does) and returns
+# a populated dict. sqlite3.Error is caught internally so a missing or
+# corrupt table cannot propagate to callers.
+
+
+def load_bubble_map(global_db) -> dict[str, dict]:
+ """Load all ``bubbleId:*`` KV entries into ``{bubble_id: bubble_dict}``.
+
+ Skips rows whose JSON value is not a dict; JSON parse errors are logged at
+ DEBUG level so a single malformed row cannot block the rest.
+ """
+ bubble_map: dict[str, dict] = {}
+ try:
+ rows = global_db.execute(
+ "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"
+ ).fetchall()
+ except sqlite3.Error:
+ return bubble_map
+ for row in rows:
+ parts = row["key"].split(":")
+ if len(parts) < 3:
+ continue
+ bid = parts[2]
+ try:
+ b = json.loads(row["value"])
+ if isinstance(b, dict):
+ bubble_map[bid] = b
+ except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e:
+ _logger.debug("Skipping malformed bubbleId row %s: %s", row["key"], e)
+ return bubble_map
+
+
+def load_project_layouts_map(global_db) -> dict[str, list]:
+ """Load ``projectLayouts`` from ``messageRequestContext:*`` KV entries.
+
+ Returns ``{composer_id: [root_path_str, ...]}``. String-encoded layout
+ objects are JSON-decoded before the ``rootPath`` field is extracted.
+ """
+ layouts_map: dict[str, list] = {}
+ try:
+ rows = global_db.execute(
+ "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'messageRequestContext:%'"
+ ).fetchall()
+ except sqlite3.Error:
+ return layouts_map
+ for row in rows:
+ parts = row["key"].split(":")
+ if len(parts) < 2:
+ continue
+ cid = parts[1]
+ try:
+ ctx = json.loads(row["value"])
+ layouts = ctx.get("projectLayouts")
+ if not isinstance(layouts, list):
+ continue
+ layouts_map.setdefault(cid, [])
+ for layout in layouts:
+ try:
+ o = json.loads(layout) if isinstance(layout, str) else layout
+ if isinstance(o, dict) and o.get("rootPath"):
+ layouts_map[cid].append(o["rootPath"])
+ except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e:
+ _logger.debug("Skipping malformed layout entry in %s: %s", row["key"], e)
+ except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e:
+ _logger.debug("Skipping malformed messageRequestContext row %s: %s", row["key"], e)
+ return layouts_map
+
+
+def load_code_block_diff_map(global_db) -> dict[str, list]:
+ """Load ``codeBlockDiff:*`` KV entries into ``{composer_id: [diff_dict]}``.
+
+ Each diff dict contains all fields from the raw JSON value plus a
+ ``diffId`` key taken from the third path component of the KV key.
+ """
+ diff_map: dict[str, list] = {}
+ try:
+ rows = global_db.execute(
+ "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'codeBlockDiff:%'"
+ ).fetchall()
+ except sqlite3.Error:
+ return diff_map
+ for row in rows:
+ parts = row["key"].split(":")
+ cid = parts[1] if len(parts) > 1 else None
+ if not cid:
+ continue
+ try:
+ d = json.loads(row["value"])
+ if isinstance(d, dict):
+ diff_map.setdefault(cid, []).append({
+ **d,
+ "diffId": parts[2] if len(parts) > 2 else None,
+ })
+ except (json.JSONDecodeError, ValueError, KeyError, TypeError) as e:
+ _logger.debug("Skipping malformed codeBlockDiff row %s: %s", row["key"], e)
+ return diff_map
def _collect_workspace_entries(workspace_path: str) -> list[dict]:
@@ -33,7 +136,7 @@ def _collect_invalid_workspace_ids(workspace_entries: list[dict]) -> set[str]:
invalid: set[str] = set()
for entry in workspace_entries:
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
folders = get_workspace_folder_paths(wd)
if not folders:
invalid.add(entry["name"])
diff --git a/services/workspace_listing.py b/services/workspace_listing.py
index bf2fbc7..228fad2 100644
--- a/services/workspace_listing.py
+++ b/services/workspace_listing.py
@@ -12,12 +12,14 @@
normalize_file_path,
to_epoch_ms,
)
-from utils.workspace_descriptor import _read_json_file
+from utils.workspace_descriptor import read_json_file
from utils.workspace_path import get_cli_chats_path
from services.workspace_db import (
_build_composer_id_to_workspace_id,
_collect_invalid_workspace_ids,
_collect_workspace_entries,
+ load_bubble_map,
+ load_project_layouts_map,
_open_global_db,
)
from services.workspace_resolver import (
@@ -54,46 +56,8 @@ def _safe_fetchall(query: str, params: tuple = ()) -> list:
"SELECT key, value FROM cursorDiskKV WHERE key LIKE 'composerData:%' AND LENGTH(value) > 10"
)
- ctx_rows = _safe_fetchall(
- "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'messageRequestContext:%'"
- )
- project_layouts_map: dict[str, list] = {}
- for row in ctx_rows:
- parts = row["key"].split(":")
- if len(parts) < 2:
- continue
- cid = parts[1]
- try:
- ctx = json.loads(row["value"])
- layouts = ctx.get("projectLayouts")
- if isinstance(layouts, list):
- if cid not in project_layouts_map:
- project_layouts_map[cid] = []
- for layout in layouts:
- if isinstance(layout, str):
- try:
- layout = json.loads(layout)
- except Exception:
- continue
- if isinstance(layout, dict) and layout.get("rootPath"):
- project_layouts_map[cid].append(layout["rootPath"])
- except Exception:
- pass
-
- bubble_rows = _safe_fetchall(
- "SELECT key, value FROM cursorDiskKV WHERE key LIKE 'bubbleId:%'"
- )
- bubble_map: dict[str, dict] = {}
- for row in bubble_rows:
- parts = row["key"].split(":")
- if len(parts) >= 3:
- bid = parts[2]
- try:
- b = json.loads(row["value"])
- if isinstance(b, dict):
- bubble_map[bid] = b
- except Exception:
- pass
+ project_layouts_map: dict[str, list] = load_project_layouts_map(global_db)
+ bubble_map: dict[str, dict] = load_bubble_map(global_db)
invalid_workspace_aliases = _infer_invalid_workspace_aliases(
composer_rows=composer_rows,
@@ -145,7 +109,7 @@ def _safe_fetchall(query: str, params: tuple = ()) -> list:
for entry in workspace_entries:
norm_folder = ""
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
folders = get_workspace_folder_paths(wd)
first_folder = folders[0] if folders else None
if first_folder:
diff --git a/services/workspace_resolver.py b/services/workspace_resolver.py
index 2a13efb..c27da96 100644
--- a/services/workspace_resolver.py
+++ b/services/workspace_resolver.py
@@ -13,7 +13,7 @@
get_workspace_folder_paths,
normalize_file_path,
)
-from utils.workspace_descriptor import _basename_from_pathish, _read_json_file
+from utils.workspace_descriptor import basename_from_pathish, read_json_file
from services.workspace_db import _open_global_db
from models import SchemaError, Workspace
@@ -24,7 +24,7 @@ def _get_workspace_display_name(workspace_path: str, workspace_id: str) -> str:
return "Other chats"
wj_path = os.path.join(workspace_path, workspace_id, "workspace.json")
try:
- workspace = Workspace.from_dict(_read_json_file(wj_path), workspace_id=workspace_id)
+ workspace = Workspace.from_dict(read_json_file(wj_path), workspace_id=workspace_id)
name = get_workspace_display_name(workspace.raw)
if name:
return name
@@ -103,7 +103,7 @@ def _infer_workspace_name_from_context(workspace_path: str, workspace_id: str) -
obj = layout
if not isinstance(obj, dict):
continue
- hint = _basename_from_pathish(obj.get("rootPath"))
+ hint = basename_from_pathish(obj.get("rootPath"))
if hint:
counts[hint] = counts.get(hint, 0) + 1
@@ -121,7 +121,7 @@ def _get_project_from_file_path(
best_len = 0
for entry in workspace_entries:
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
for folder in get_workspace_folder_paths(wd):
wp = normalize_file_path(folder)
try:
@@ -140,7 +140,7 @@ def _create_project_name_to_workspace_id_map(workspace_entries):
mapping = {}
for entry in workspace_entries:
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
for folder in get_workspace_folder_paths(wd):
wp = re.sub(r"^file://", "", folder)
parts = wp.replace("\\", "/").split("/")
@@ -156,7 +156,7 @@ def _create_workspace_path_to_id_map(workspace_entries):
out = {}
for entry in workspace_entries:
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
for folder in get_workspace_folder_paths(wd):
normalized = normalize_file_path(folder)
out[normalized] = entry["name"]
@@ -269,7 +269,7 @@ def _determine_project_for_conversation(
folder_name_to_ws = []
for entry in workspace_entries:
try:
- wd = _read_json_file(entry["workspaceJsonPath"])
+ wd = read_json_file(entry["workspaceJsonPath"])
for folder in get_workspace_folder_paths(wd):
name = re.sub(r"^file://", "", folder).replace("\\", "/").split("/")[-1]
if name:
diff --git a/services/workspace_tabs.py b/services/workspace_tabs.py
index 5b8d559..42fa807 100644
--- a/services/workspace_tabs.py
+++ b/services/workspace_tabs.py
@@ -2,7 +2,6 @@
import json
import os
-import re
import sqlite3
from datetime import datetime
from typing import Any
@@ -15,12 +14,13 @@
from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules
from utils.text_extract import extract_text_from_bubble
from utils.tool_parser import parse_tool_call as _parse_tool_call
-from utils.workspace_descriptor import _read_json_file
+from utils.workspace_descriptor import read_json_file
from models import Bubble, Composer, SchemaError
from services.workspace_db import (
_build_composer_id_to_workspace_id,
_collect_invalid_workspace_ids,
_collect_workspace_entries,
+ load_code_block_diff_map,
_open_global_db,
)
from services.workspace_resolver import (
@@ -32,10 +32,6 @@
)
-def _extract_chat_id_from_code_block_diff_key(key: str) -> str | None:
- m = re.match(r"^codeBlockDiff:([^:]+):", key)
- return m.group(1) if m else None
-
def _try_loads_kv_value(raw: str | None) -> Any | None:
"""Parse a cursorDiskKV ``value`` column; ``None`` on missing or unparseable input (no raise)."""
@@ -68,7 +64,7 @@ def assemble_workspace_tabs(
target_folder = ""
wj_path = os.path.join(workspace_path, workspace_id, "workspace.json")
try:
- wd = _read_json_file(wj_path)
+ wd = read_json_file(wj_path)
folders = get_workspace_folder_paths(wd)
first_folder = folders[0] if folders else None
if first_folder:
@@ -78,7 +74,7 @@ def assemble_workspace_tabs(
if target_folder:
for entry in workspace_entries:
try:
- wd2 = _read_json_file(entry["workspaceJsonPath"])
+ wd2 = read_json_file(entry["workspaceJsonPath"])
folders2 = get_workspace_folder_paths(wd2)
f2 = folders2[0] if folders2 else None
if f2 and normalize_file_path(f2) == target_folder:
@@ -120,17 +116,7 @@ def _safe_fetchall(query: str, params: tuple = ()) -> list:
print(f"Schema drift in bubble {bid}: {e}")
# Load codeBlockDiffs
- for row in _safe_fetchall("SELECT key, value FROM cursorDiskKV WHERE key LIKE 'codeBlockDiff:%'"):
- chat_id = _extract_chat_id_from_code_block_diff_key(row["key"])
- if not chat_id:
- continue
- d = _try_loads_kv_value(row["value"])
- if not isinstance(d, dict):
- continue
- code_block_diff_map.setdefault(chat_id, []).append({
- **d,
- "diffId": row["key"].split(":")[2] if len(row["key"].split(":")) > 2 else None,
- })
+ code_block_diff_map = load_code_block_diff_map(global_db)
# Load messageRequestContext rows once; build both
# message_request_context_map and project_layouts_map from the same pass.
diff --git a/utils/cursor_md_exporter.py b/utils/cursor_md_exporter.py
index 8ec9d91..eaedf88 100644
--- a/utils/cursor_md_exporter.py
+++ b/utils/cursor_md_exporter.py
@@ -1,9 +1,17 @@
-"""Markdown export for Cursor CLI agent sessions.
+"""Markdown export for Cursor chat sessions.
-Exposes ``cursor_cli_session_to_markdown`` — a reusable function that
-generates a complete Markdown document (YAML frontmatter + body) from a
-Cursor CLI ``store.db`` session. The logic is shared between
-``scripts/export.py`` and any programmatic caller.
+Two public functions:
+
+* ``cursor_cli_session_to_markdown`` — generates a Markdown document from a
+ Cursor CLI ``store.db`` session (agent/CLI chat).
+
+* ``cursor_ide_chat_to_markdown`` — generates a Markdown document from a
+ Cursor IDE composer session (global-storage ``composerData:`` entry). The
+ caller supplies the pre-loaded ``bubble_map`` and optional
+ ``code_block_diff_map`` so this function never touches the database.
+
+Both are shared between ``scripts/export.py``, ``api/export_api.py``, and any
+programmatic caller.
"""
from __future__ import annotations
@@ -13,15 +21,12 @@
from pathlib import Path
from utils.cli_chat_reader import traverse_blobs, messages_to_bubbles
+from utils.path_helpers import to_epoch_ms
+from utils.text_extract import extract_text_from_bubble, slug
+from utils.tool_parser import parse_tool_call
-def _slug(s: str) -> str:
- """Simple slug: collapse whitespace and special chars to dashes."""
- import re
- s = re.sub(r'[<>:"/\\|?*]', "_", s or "")
- s = re.sub(r"\s+", "-", s)
- s = re.sub(r"-+", "-", s)
- return s.strip("-")[:80] or "untitled"
+# ── CLI session exporter ─────────────────────────────────────────────────────
def cursor_cli_session_to_markdown(
@@ -180,3 +185,323 @@ def cursor_cli_session_to_markdown(
body += "---\n\n"
return fm_str + header + body
+
+
+# ── IDE chat exporter ────────────────────────────────────────────────────────
+
+
+def cursor_ide_chat_to_markdown(
+ composer_data: dict,
+ composer_id: str,
+ bubble_map: dict,
+ code_block_diff_map: dict | None = None,
+ workspace_info: dict | None = None,
+) -> str:
+ """Generate a complete Markdown document from a Cursor IDE composer session.
+
+ Parameters
+ ----------
+ composer_data:
+ Parsed value of a ``composerData:`` KV entry from global storage.
+ composer_id:
+ The composer UUID — used as ``log_id`` in frontmatter and as the key
+ into ``code_block_diff_map``.
+ bubble_map:
+ Global ``{bubble_id: bubble_dict}`` map loaded from
+ ``cursorDiskKV`` (see ``services.workspace_db.load_bubble_map``).
+ code_block_diff_map:
+ Optional ``{composer_id: [diff_dict]}`` map. When ``None`` no code
+ edit bubbles are appended.
+ workspace_info:
+ Optional dict with workspace display fields. Recognised keys:
+ ``ws_slug`` (str), ``ws_display_name`` (str).
+
+ Returns
+ -------
+ str
+ Full Markdown text including YAML frontmatter and conversation body.
+ """
+ cd = composer_data
+ ws_info = workspace_info or {}
+ ws_slug = ws_info.get("ws_slug", "other-chats")
+ ws_display_name = ws_info.get("ws_display_name", "Other chats")
+ diffs = (code_block_diff_map or {}).get(composer_id, [])
+
+ title = cd.get("name") or f"Chat {composer_id[:8]}"
+ model_config = cd.get("modelConfig") or {}
+ model_name = model_config.get("modelName")
+ updated_at = to_epoch_ms(cd.get("lastUpdatedAt")) or to_epoch_ms(cd.get("createdAt")) or 0
+ created_ms = to_epoch_ms(cd.get("createdAt")) or updated_at or int(datetime.now().timestamp() * 1000)
+ headers = cd.get("fullConversationHeadersOnly") or []
+
+ # ── Build bubble list ─────────────────────────────────────────────────────
+ bubbles: list[dict] = []
+ for h in headers:
+ b = bubble_map.get(h.get("bubbleId"))
+ if not b:
+ continue
+ text = extract_text_from_bubble(b)
+ has_tool = isinstance(b.get("toolFormerData"), dict)
+ has_thinking = bool(b.get("thinking"))
+ if not text.strip() and not has_tool and not has_thinking:
+ continue
+ if not text.strip() and has_tool:
+ text = f"**Tool: {b['toolFormerData'].get('name', 'unknown')}**"
+
+ btype = "user" if h.get("type") == 1 else "ai"
+
+ thinking = None
+ thinking_duration_ms = None
+ if b.get("thinking"):
+ thinking = (
+ b["thinking"] if isinstance(b["thinking"], str)
+ else (b["thinking"].get("text") if isinstance(b["thinking"], dict) else None)
+ )
+ thinking_duration_ms = b.get("thinkingDurationMs")
+
+ tool_info = parse_tool_call(b["toolFormerData"]) if has_tool else None
+
+ model_info = (b.get("modelInfo") or {}).get("modelName")
+ if model_info == "default":
+ model_info = None
+
+ ctx_window = b.get("contextWindowStatusAtCreation") or {}
+ ctx_tokens_used = ctx_window.get("tokensUsed", 0)
+ ctx_token_limit = ctx_window.get("tokenLimit", 0)
+ ctx_pct_remaining = (
+ ctx_window.get("percentageRemainingFloat") or ctx_window.get("percentageRemaining")
+ )
+
+ bubbles.append({
+ "type": btype,
+ "text": text,
+ "timestamp": (
+ to_epoch_ms(b.get("createdAt"))
+ or to_epoch_ms(b.get("timestamp"))
+ or int(datetime.now().timestamp() * 1000)
+ ),
+ "tool": tool_info,
+ "thinking": thinking,
+ "thinkingDurationMs": thinking_duration_ms,
+ "model": model_info,
+ "contextTokensUsed": ctx_tokens_used if ctx_tokens_used > 0 else None,
+ "contextTokenLimit": ctx_token_limit if ctx_token_limit > 0 else None,
+ "contextPctRemaining": round(ctx_pct_remaining, 1) if ctx_pct_remaining else None,
+ })
+
+ # Append code-block diffs as synthetic AI bubbles.
+ diff_ts = to_epoch_ms(cd.get("lastUpdatedAt")) or to_epoch_ms(cd.get("createdAt")) or int(datetime.now().timestamp() * 1000)
+ for d in diffs:
+ bubbles.append({
+ "type": "ai",
+ "text": f"**Code edit:** {json.dumps(d)}",
+ "timestamp": diff_ts,
+ })
+
+ bubbles.sort(key=lambda bub: bub.get("timestamp") or 0)
+
+ # ── Compute response times ────────────────────────────────────────────────
+ last_user_ts = None
+ for bub in bubbles:
+ if bub["type"] == "user":
+ last_user_ts = bub.get("timestamp")
+ elif bub["type"] == "ai" and last_user_ts:
+ bts = bub.get("timestamp")
+ if bts and bts > last_user_ts:
+ bub["responseTimeMs"] = bts - last_user_ts
+
+ # ── Session-level aggregates ──────────────────────────────────────────────
+ total_response_ms = sum(bub.get("responseTimeMs", 0) for bub in bubbles)
+ total_thinking_ms = sum(bub.get("thinkingDurationMs", 0) or 0 for bub in bubbles)
+ total_tool_calls = sum(1 for bub in bubbles if bub.get("tool"))
+ max_ctx_used = max((bub.get("contextTokensUsed") or 0) for bub in bubbles) if bubbles else 0
+ ctx_limit = max((bub.get("contextTokenLimit") or 0) for bub in bubbles) if bubbles else 0
+ lines_added = cd.get("totalLinesAdded", 0)
+ lines_removed = cd.get("totalLinesRemoved", 0)
+
+ tool_breakdown: dict[str, int] = {}
+ for bub in bubbles:
+ if bub.get("tool"):
+ tn = bub["tool"].get("name", "unknown")
+ tool_breakdown[tn] = tool_breakdown.get(tn, 0) + 1
+
+ ts_vals = [bub["timestamp"] for bub in bubbles if bub.get("timestamp")]
+ wall_clock_sec = int((max(ts_vals) - min(ts_vals)) / 1000) if len(ts_vals) >= 2 else None
+
+ # ── File / command activity ───────────────────────────────────────────────
+ files_read_list: list[str] = []
+ files_written_list: list[str] = []
+ commands_run_list: list[str] = []
+ tool_result_stats = {
+ "terminal_success": 0, "terminal_error": 0,
+ "file_reads": 0, "file_edits": 0,
+ "searches": 0, "web": 0,
+ }
+ for bub in bubbles:
+ if not bub.get("tool"):
+ continue
+ t = bub["tool"]
+ tn = t.get("name", "")
+ status = t.get("status") or ""
+ raw_input = str(t.get("input") or "").strip()
+ first_line = raw_input.split("\n")[0] if raw_input else ""
+ if tn == "read_file_v2" and first_line:
+ files_read_list.append(first_line)
+ tool_result_stats["file_reads"] += 1
+ elif tn == "edit_file_v2" and first_line:
+ files_written_list.append(first_line)
+ tool_result_stats["file_edits"] += 1
+ elif tn == "run_terminal_command_v2" and raw_input:
+ commands_run_list.append(raw_input)
+ if status in ("error", "failed"):
+ tool_result_stats["terminal_error"] += 1
+ else:
+ tool_result_stats["terminal_success"] += 1
+ elif tn in ("ripgrep_raw_search", "glob_file_search", "semantic_search_full"):
+ tool_result_stats["searches"] += 1
+ elif tn in ("web_search", "web_fetch"):
+ tool_result_stats["web"] += 1
+
+ # ── Frontmatter ───────────────────────────────────────────────────────────
+ fm_lines = ["---"]
+ fm_lines.append(f"log_id: {json.dumps(composer_id, ensure_ascii=False)}")
+ fm_lines.append("log_type: chat")
+ fm_lines.append(f"title: {json.dumps(title, ensure_ascii=False)}")
+ fm_lines.append(f"created_at: {datetime.fromtimestamp(created_ms / 1000).isoformat()}")
+ fm_lines.append(
+ f"updated_at: {datetime.fromtimestamp(updated_at / 1000).isoformat() if updated_at else datetime.now().isoformat()}"
+ )
+ fm_lines.append(f"workspace: {ws_slug}")
+ fm_lines.append(f"workspace_name: {json.dumps(ws_display_name, ensure_ascii=False)}")
+ if model_name and model_name != "default":
+ fm_lines.append(f"model: {json.dumps(model_name, ensure_ascii=False)}")
+ fm_lines.append(f"message_count: {len(bubbles)}")
+ if total_tool_calls:
+ fm_lines.append(f"total_tool_calls: {total_tool_calls}")
+ if tool_breakdown:
+ fm_lines.append("tool_call_breakdown:")
+ for tn, cnt in sorted(tool_breakdown.items(), key=lambda x: -x[1]):
+ fm_lines.append(f" {json.dumps(tn, ensure_ascii=False)}: {cnt}")
+ total_think = sum(1 for bub in bubbles if bub.get("thinking"))
+ if total_think:
+ fm_lines.append(f"thinking_count: {total_think}")
+ if wall_clock_sec is not None:
+ fm_lines.append(f"wall_clock_seconds: {wall_clock_sec}")
+ if total_response_ms:
+ fm_lines.append(f"total_response_time_sec: {total_response_ms / 1000:.1f}")
+ if total_thinking_ms:
+ fm_lines.append(f"total_thinking_time_sec: {total_thinking_ms / 1000:.1f}")
+ if max_ctx_used and ctx_limit:
+ fm_lines.append(f"max_context_tokens_used: {max_ctx_used}")
+ fm_lines.append(f"context_token_limit: {ctx_limit}")
+ if lines_added or lines_removed:
+ fm_lines.append(f"lines_added: {lines_added}")
+ fm_lines.append(f"lines_removed: {lines_removed}")
+ if files_read_list or files_written_list:
+ fm_lines.append(f"files_read: {len(files_read_list)}")
+ fm_lines.append(f"files_written: {len(files_written_list)}")
+ if commands_run_list:
+ fm_lines.append(f"commands_run: {len(commands_run_list)}")
+ fm_lines.append("---")
+ fm_str = "\n".join(fm_lines) + "\n\n"
+
+ # ── Document header ───────────────────────────────────────────────────────
+ header = f"# {title}\n\n"
+ meta_parts: list[str] = []
+ if created_ms:
+ meta_parts.append(f"Created: {datetime.fromtimestamp(created_ms / 1000).strftime('%Y-%m-%d %H:%M:%S')}")
+ if model_name and model_name != "default":
+ meta_parts.append(f"Model: {model_name}")
+ if total_tool_calls:
+ meta_parts.append(f"Tool calls: {total_tool_calls}")
+ if wall_clock_sec is not None:
+ hrs, rem = divmod(wall_clock_sec, 3600)
+ mins, secs = divmod(rem, 60)
+ dur = f"{hrs}h {mins}m" if hrs else (f"{mins}m {secs}s" if mins else f"{secs}s")
+ meta_parts.append(f"Duration: {dur}")
+ header += f"_{' | '.join(meta_parts)}_\n\n---\n\n" if meta_parts else "---\n\n"
+
+ # ── Session summary block ─────────────────────────────────────────────────
+ summary = ""
+ if files_read_list or files_written_list or commands_run_list:
+ summary += "## Session Summary\n\n"
+ if files_written_list or files_read_list:
+ summary += "### Files Touched\n\n"
+ summary += "| Action | File |\n|--------|------|\n"
+ for fp in files_written_list:
+ summary += f"| Edit | `{fp}` |\n"
+ for fp in files_read_list:
+ summary += f"| Read | `{fp}` |\n"
+ summary += "\n"
+ if commands_run_list:
+ summary += "### Commands Run\n\n"
+ for i, cmd in enumerate(commands_run_list, 1):
+ summary += f"{i}. `{cmd}`\n"
+ summary += "\n"
+ non_zero = {k: v for k, v in tool_result_stats.items() if v > 0}
+ if non_zero:
+ summary += "### Tool Results\n\n"
+ labels = {
+ "terminal_success": "Terminal Success",
+ "terminal_error": "Terminal Error",
+ "file_reads": "File Reads",
+ "file_edits": "File Edits",
+ "searches": "Searches",
+ "web": "Web Fetches",
+ }
+ for k, v in non_zero.items():
+ summary += f"- {labels.get(k, k)}: {v}\n"
+ summary += "\n"
+ summary += "---\n\n"
+
+ # ── Body ──────────────────────────────────────────────────────────────────
+ body = ""
+ for bub in bubbles:
+ role = "User" if bub["type"] == "user" else "Assistant"
+ body += f"### {role}\n\n"
+ bub_meta: list[str] = []
+ if bub.get("model"):
+ bub_meta.append(f"Model: {bub['model']}")
+ if bub.get("responseTimeMs"):
+ bub_meta.append(f"Response: {bub['responseTimeMs'] / 1000:.1f}s")
+ if bub.get("thinkingDurationMs"):
+ bub_meta.append(f"Thinking: {bub['thinkingDurationMs'] / 1000:.1f}s")
+ if bub.get("contextTokensUsed") and bub.get("contextTokenLimit"):
+ pct = bub["contextTokensUsed"] / bub["contextTokenLimit"] * 100
+ bub_meta.append(
+ f"Context: {bub['contextTokensUsed']:,} / {bub['contextTokenLimit']:,}"
+ f" tokens ({pct:.0f}% used)"
+ )
+ elif bub.get("contextPctRemaining") is not None:
+ bub_meta.append(f"Context: {bub['contextPctRemaining']}% remaining")
+ if bub_meta:
+ body += f"_{' | '.join(bub_meta)}_\n\n"
+ if bub.get("timestamp"):
+ body += f"_{datetime.fromtimestamp(bub['timestamp'] / 1000).isoformat()}_\n\n"
+ if bub.get("thinking"):
+ dur_str = (
+ f" ({bub['thinkingDurationMs'] / 1000:.1f}s)"
+ if bub.get("thinkingDurationMs") else ""
+ )
+ body += f"Thinking{dur_str}
\n\n{bub['thinking']}\n\n \n\n"
+ body += bub["text"] + "\n\n"
+ if bub.get("tool"):
+ t = bub["tool"]
+ tool_summary = t.get("summary") or t.get("name") or "unknown"
+ tool_status = t.get("status") or ""
+ status_str = f" ({tool_status})" if tool_status else ""
+ body += f"> **Tool: {tool_summary}**{status_str}\n"
+ if t.get("input"):
+ body += "> **INPUT:**\n> ```\n"
+ for iline in str(t["input"]).split("\n"):
+ body += f"> {iline}\n"
+ body += "> ```\n"
+ if t.get("output"):
+ body += "> **OUTPUT:**\n> ```\n"
+ for oline in str(t["output"]).split("\n"):
+ body += f"> {oline}\n"
+ body += "> ```\n"
+ body += "\n"
+ body += "---\n\n"
+
+ return fm_str + header + summary + body
diff --git a/utils/text_extract.py b/utils/text_extract.py
index f4a80c9..d0b179c 100644
--- a/utils/text_extract.py
+++ b/utils/text_extract.py
@@ -1,6 +1,7 @@
"""Text extraction helpers mirroring the bubble/richText parsing in the Node.js codebase."""
import json
+import re
def extract_text_from_rich_text(children: list) -> str:
@@ -51,6 +52,15 @@ def extract_text_from_bubble(bubble: dict) -> str:
return text
+def slug(s: str) -> str:
+ """Convert a string to a filesystem-safe slug (max 80 chars)."""
+ s = re.sub(r'[<>:"/\\|?*]', "_", s or "")
+ s = re.sub(r"\s+", "-", s)
+ s = re.sub(r"-+", "-", s)
+ s = s.strip("-")
+ return s[:80] or "untitled"
+
+
def format_tool_action(action: dict) -> str:
"""Format a tool action / codeBlockDiff into readable text."""
if not action:
diff --git a/utils/workspace_descriptor.py b/utils/workspace_descriptor.py
index df0a206..ea60780 100644
--- a/utils/workspace_descriptor.py
+++ b/utils/workspace_descriptor.py
@@ -7,7 +7,7 @@
from urllib.parse import unquote, urlparse
-def _read_json_file(path: str):
+def read_json_file(path: str):
"""Read a workspace.json with Cursor indirection applied."""
return _resolve_workspace_descriptor(path)
@@ -70,7 +70,7 @@ def _resolve_workspace_descriptor(path: str, depth: int = 0):
return out
-def _basename_from_pathish(path_value: str | None) -> str | None:
+def basename_from_pathish(path_value: str | None) -> str | None:
"""Extract a readable leaf folder name from file URI or filesystem path."""
if not path_value:
return None