From 72f0f966e91821dfad105ce0bd83b7f1b0a61890 Mon Sep 17 00:00:00 2001 From: star-med Date: Tue, 19 May 2026 06:47:40 +0800 Subject: [PATCH 1/8] Add search limit validation, typed models, and mypy strict CI - Return 400 with a clear JSON error when /api/search limit is invalid instead of crashing with ValueError on bad input (abc, 1.5, etc.) - Add models/ TypedDict shapes and annotate parser/API boundaries - Enable mypy --strict on api, utils, and models; add CI mypy job - Add tests/test_search.py for limit validation cases --- .github/workflows/ci.yml | 17 ++++++ api/_flask_types.py | 12 ++++ api/export_api.py | 48 ++++++++------- api/projects.py | 29 ++++++--- api/search.py | 30 +++++++-- api/sessions.py | 26 ++++---- models/__init__.py | 28 +++++++++ models/errors.py | 7 +++ models/export.py | 10 +++ models/project.py | 30 +++++++++ models/search.py | 12 ++++ models/session.py | 74 ++++++++++++++++++++++ models/stats.py | 26 ++++++++ pyproject.toml | 9 +++ requirements-dev.txt | 2 + tests/test_search.py | 75 +++++++++++++++++++++++ utils/exclusion_rules.py | 25 ++++---- utils/export_day_filter.py | 22 ++++--- utils/export_state_store.py | 26 +++++--- utils/json_exporter.py | 16 +++-- utils/jsonl_parser.py | 118 ++++++++++++++++++++---------------- utils/md_exporter.py | 45 ++++++++------ utils/session_path.py | 19 +++--- utils/session_stats.py | 33 +++++----- 24 files changed, 569 insertions(+), 170 deletions(-) create mode 100644 api/_flask_types.py create mode 100644 models/__init__.py create mode 100644 models/errors.py create mode 100644 models/export.py create mode 100644 models/project.py create mode 100644 models/search.py create mode 100644 models/session.py create mode 100644 models/stats.py create mode 100644 pyproject.toml create mode 100644 tests/test_search.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ee2f5fd..39d869d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,3 +57,20 @@ jobs: - name: Run tests run: pytest --tb=short -q + + mypy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + cache: pip + cache-dependency-path: requirements-dev.txt + + - name: Install dev dependencies + run: pip install -r requirements-dev.txt + + - name: Run mypy (strict) + run: mypy diff --git a/api/_flask_types.py b/api/_flask_types.py new file mode 100644 index 0000000..70d8b02 --- /dev/null +++ b/api/_flask_types.py @@ -0,0 +1,12 @@ +"""Shared Flask handler return types for mypy.""" + +from typing import Any, Union, cast + +from flask import Response, jsonify + +FlaskReturn = Union[Response, tuple[Response, int]] + + +def json_ok(*args: Any, **kwargs: Any) -> Response: + """Typed wrapper around :func:`flask.jsonify`.""" + return cast(Response, jsonify(*args, **kwargs)) diff --git a/api/export_api.py b/api/export_api.py index a03653e..b108816 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -5,8 +5,12 @@ import os import zipfile from datetime import datetime +from typing import Any -from flask import Blueprint, current_app, jsonify, request, send_file +from flask import Blueprint, current_app, request, send_file + +from api._flask_types import FlaskReturn, json_ok +from models.export import ExportStateDict from utils.export_state_store import ( EXPORT_STATE_FILE, @@ -33,24 +37,24 @@ _STATE_FILE = EXPORT_STATE_FILE -def _state_lock(): +def _state_lock() -> Any: return export_state_lock(_STATE_FILE) -def _load_state_from_disk() -> dict: +def _load_state_from_disk() -> ExportStateDict: return load_export_state_from_disk(_STATE_FILE) -def _atomic_write_state(state: dict) -> None: +def _atomic_write_state(state: ExportStateDict) -> None: atomic_write_export_state(state, _STATE_FILE) -def _read_state() -> dict: +def _read_state() -> ExportStateDict: with _state_lock(): return _load_state_from_disk() -def _write_state(sessions_map: dict, count: int) -> None: +def _write_state(sessions_map: dict[str, float], count: int) -> None: """Persist merge of *sessions_map* and update last-export metadata (*count* = this run only).""" with _state_lock(): state = _load_state_from_disk() @@ -61,10 +65,10 @@ def _write_state(sessions_map: dict, count: int) -> None: @export_bp.route("/api/export/state") -def get_export_state(): +def get_export_state() -> FlaskReturn: state = _read_state() n = state.get("exportedCount", 0) - return jsonify( + return json_ok( { "last_export_time": state.get("lastExportTime"), # Sessions exported in the last completed bulk export (not a lifetime total). @@ -75,16 +79,16 @@ def get_export_state(): @export_bp.route("/api/export", methods=["POST"]) -def bulk_export(): +def bulk_export() -> FlaskReturn: body = request.get_json(silent=True) if body is None: body = {} if not isinstance(body, dict): - return jsonify({"error": "Invalid request body"}), 400 + return json_ok({"error": "Invalid request body"}), 400 since = body.get("since", "all") if since not in ("all", "last", "incremental"): - return jsonify({"error": "Invalid since mode", "since": since}), 400 + return json_ok({"error": "Invalid since mode", "since": since}), 400 base = ( current_app.config.get("CLAUDE_PROJECTS_DIR") @@ -94,14 +98,14 @@ def bulk_export(): rules = current_app.config.get("EXCLUSION_RULES") or [] state = _read_state() - last_export_sessions: dict = ( + last_export_sessions: dict[str, float] = ( state.get("sessions", {}) if since == "incremental" else {} ) buf = io.BytesIO() count = 0 - manifest = [] - new_sessions_map: dict = {} + manifest: list[dict[str, Any]] = [] + new_sessions_map: dict[str, float] = {} latest_day = None with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: @@ -227,7 +231,7 @@ def bulk_export(): if count == 0: return ( - jsonify( + json_ok( { "error": "Nothing to export", "since": since, @@ -251,12 +255,12 @@ def bulk_export(): buf, mimetype="application/zip", as_attachment=True, - download_name=f"claude-code-export{suffix}-{date_tag}.zip", + download_name=f"claude-code-export{suffix}-{date_tag}.zip", # type: ignore[call-arg] ) @export_bp.route("/api/export/session//") -def export_session(project_name, session_id): +def export_session(project_name: str, session_id: str) -> FlaskReturn: import os from utils.session_path import safe_join @@ -267,16 +271,16 @@ def export_session(project_name, session_id): try: filepath = safe_join(base, project_name, f"{session_id}.jsonl") except ValueError: - return jsonify({"error": "Invalid path"}), 400 + return json_ok({"error": "Invalid path"}), 400 if not os.path.isfile(filepath): - return jsonify({"error": "Session not found"}), 404 + return json_ok({"error": "Session not found"}), 404 fmt = request.args.get("format", "md") session = parse_session(filepath) rules = current_app.config.get("EXCLUSION_RULES") or [] if is_session_excluded(rules, session, project_name): - return jsonify({"error": "Session not found"}), 404 + return json_ok({"error": "Session not found"}), 404 stats = compute_stats(session) title_slug = slugify(session["title"], default="session") @@ -288,7 +292,7 @@ def export_session(project_name, session_id): buf, mimetype="application/json", as_attachment=True, - download_name=f"{title_slug}.json", + download_name=f"{title_slug}.json", # type: ignore[call-arg] ) md = session_to_markdown(session, stats) @@ -298,5 +302,5 @@ def export_session(project_name, session_id): buf, mimetype="text/markdown", as_attachment=True, - download_name=f"{title_slug}.md", + download_name=f"{title_slug}.md", # type: ignore[call-arg] ) diff --git a/api/projects.py b/api/projects.py index d6a3935..e722b69 100644 --- a/api/projects.py +++ b/api/projects.py @@ -1,7 +1,11 @@ """Project listing endpoints.""" +from typing import cast + from flask import Blueprint, current_app, jsonify +from api._flask_types import FlaskReturn, json_ok +from models.project import ProjectSessionRowDict from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join from utils.exclusion_rules import is_session_excluded @@ -9,7 +13,7 @@ @projects_bp.route("/api/projects") -def get_projects(): +def get_projects() -> FlaskReturn: base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() projects = list_projects(base) @@ -36,21 +40,21 @@ def get_projects(): if latest_ts: project["last_modified"] = latest_ts - return jsonify(projects) + return json_ok(projects) @projects_bp.route("/api/projects//sessions") -def get_project_sessions(project_name): +def get_project_sessions(project_name: str) -> FlaskReturn: base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() try: project_dir = safe_join(base, project_name) except ValueError: - return jsonify([]), 400 + return json_ok([]), 400 sessions = list_sessions(project_dir) # Add summary preview for each session from utils.jsonl_parser import parse_session rules = current_app.config.get("EXCLUSION_RULES") or [] - result = [] + result: list[ProjectSessionRowDict] = [] for s in sessions: try: parsed = parse_session(s["path"]) @@ -60,20 +64,25 @@ def get_project_sessions(project_name): continue if is_session_excluded(rules, parsed, project_name): continue - result.append({ + models = meta.get("models_used", []) + result.append(cast(ProjectSessionRowDict, { **s, "title": parsed["title"], - "models": meta["models_used"], + "models": sorted(models) if isinstance(models, set) else list(models), "tokens": meta["total_input_tokens"] + meta["total_output_tokens"], "tool_calls": meta["total_tool_calls"], "first_timestamp": meta["first_timestamp"], "last_timestamp": meta["last_timestamp"], - }) + })) except Exception: # Full detail (class, message, traceback) to the server log via # logger.exception. The per-session card carries only `error: True` # — the class-name+message string was a leak (issue #25). The # operator looks at the server log for triage. current_app.logger.exception("Failed to parse session %s", s["id"]) - result.append({**s, "title": "Error parsing session", "error": True}) - return jsonify(result) + result.append(cast(ProjectSessionRowDict, { + **s, + "title": "Error parsing session", + "error": True, + })) + return json_ok(result) diff --git a/api/search.py b/api/search.py index c323123..43ccc0f 100644 --- a/api/search.py +++ b/api/search.py @@ -4,25 +4,45 @@ from flask import Blueprint, current_app, jsonify, request +from api._flask_types import FlaskReturn, json_ok +from models.search import SearchHitDict from utils.session_path import get_claude_projects_dir, list_projects, list_sessions from utils.jsonl_parser import parse_session from utils.exclusion_rules import is_session_excluded search_bp = Blueprint("search", __name__) +_DEFAULT_LIMIT = 50 + + +def _parse_limit(raw: str | None, default: int = _DEFAULT_LIMIT) -> int: + """Parse a positive integer limit from a query string value.""" + if raw is None or raw.strip() == "": + return default + try: + value = int(raw) + except ValueError: + raise ValueError("Invalid limit: must be a positive integer") from None + if value < 1: + raise ValueError("Invalid limit: must be a positive integer") + return value + @search_bp.route("/api/search") -def search(): +def search() -> FlaskReturn: query = request.args.get("q", "").strip().lower() if not query: - return jsonify([]) + return json_ok([]) - max_results = int(request.args.get("limit", 50)) + try: + max_results = _parse_limit(request.args.get("limit")) + except ValueError as e: + return json_ok({"error": str(e)}), 400 base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() projects = list_projects(base) rules = current_app.config.get("EXCLUSION_RULES") or [] - results = [] + results: list[SearchHitDict] = [] for project in projects: sessions = list_sessions(project["path"]) for sess_info in sessions: @@ -56,4 +76,4 @@ def search(): if len(results) >= max_results: break - return jsonify(results) + return json_ok(results) diff --git a/api/sessions.py b/api/sessions.py index 93b86f1..514eaf2 100644 --- a/api/sessions.py +++ b/api/sessions.py @@ -2,7 +2,9 @@ import os -from flask import Blueprint, current_app, jsonify, abort +from flask import Blueprint, current_app, jsonify + +from api._flask_types import FlaskReturn, json_ok from utils.session_path import get_claude_projects_dir, safe_join from utils.jsonl_parser import parse_session @@ -13,22 +15,22 @@ @sessions_bp.route("/api/sessions//") -def get_session(project_name, session_id): +def get_session(project_name: str, session_id: str) -> FlaskReturn: base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() try: filepath = safe_join(base, project_name, f"{session_id}.jsonl") except ValueError: - return jsonify({"error": "Invalid path"}), 400 + return json_ok({"error": "Invalid path"}), 400 if not os.path.isfile(filepath): - return jsonify({"error": f"Session {session_id} not found"}), 404 + return json_ok({"error": f"Session {session_id} not found"}), 404 try: session = parse_session(filepath) rules = current_app.config.get("EXCLUSION_RULES") or [] if is_session_excluded(rules, session, project_name): - return jsonify({"error": "Session not found"}), 404 - return jsonify(session) + return json_ok({"error": "Session not found"}), 404 + return json_ok(session) except Exception: # Full traceback (class name, message, stack) goes to the server log # via logger.exception. The HTTP body returns a stable, generic @@ -36,26 +38,26 @@ def get_session(project_name, session_id): # internal field names, file paths, and user values to any client # (issue #25). current_app.logger.exception("Failed to parse session %s", session_id) - return jsonify({"error": "Failed to parse session"}), 500 + return json_ok({"error": "Failed to parse session"}), 500 @sessions_bp.route("/api/sessions///stats") -def get_session_stats(project_name, session_id): +def get_session_stats(project_name: str, session_id: str) -> FlaskReturn: base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir() try: filepath = safe_join(base, project_name, f"{session_id}.jsonl") except ValueError: - return jsonify({"error": "Invalid path"}), 400 + return json_ok({"error": "Invalid path"}), 400 if not os.path.isfile(filepath): - return jsonify({"error": f"Session {session_id} not found"}), 404 + return json_ok({"error": f"Session {session_id} not found"}), 404 try: session = parse_session(filepath) stats = compute_stats(session) - return jsonify(stats) + return json_ok(stats) except Exception: # Same pattern as get_session above — full detail to the server log, # generic message in the HTTP body (issue #25). current_app.logger.exception("Failed to compute stats for %s", session_id) - return jsonify({"error": "Failed to compute session stats"}), 500 + return json_ok({"error": "Failed to compute session stats"}), 500 diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..5f5b21c --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,28 @@ +"""Typed wire and domain shapes for claude-code-chat-browser.""" + +from models.errors import ErrorResponse +from models.export import ExportStateDict +from models.project import ProjectDict, ProjectSessionRowDict, SessionListItemDict +from models.search import SearchHitDict +from models.session import ( + MessageDict, + QuickSessionInfoDict, + SessionDict, + SessionMetadataDict, +) +from models.stats import FilesTouchedDict, SessionStatsDict + +__all__ = [ + "ErrorResponse", + "ExportStateDict", + "FilesTouchedDict", + "MessageDict", + "ProjectDict", + "ProjectSessionRowDict", + "QuickSessionInfoDict", + "SearchHitDict", + "SessionDict", + "SessionListItemDict", + "SessionMetadataDict", + "SessionStatsDict", +] diff --git a/models/errors.py b/models/errors.py new file mode 100644 index 0000000..6af19dd --- /dev/null +++ b/models/errors.py @@ -0,0 +1,7 @@ +"""HTTP error response shapes.""" + +from typing import TypedDict + + +class ErrorResponse(TypedDict): + error: str diff --git a/models/export.py b/models/export.py new file mode 100644 index 0000000..cec4513 --- /dev/null +++ b/models/export.py @@ -0,0 +1,10 @@ +"""Export state file shapes.""" + +from typing import NotRequired, TypedDict + + +class ExportStateDict(TypedDict, total=False): + lastExportTime: str + exportedCount: int + sessions: dict[str, float] + exportDir: str diff --git a/models/project.py b/models/project.py new file mode 100644 index 0000000..557fc29 --- /dev/null +++ b/models/project.py @@ -0,0 +1,30 @@ +"""Project and session listing shapes.""" + +from typing import NotRequired, TypedDict + + +class ProjectDict(TypedDict): + name: str + path: str + display_name: str + session_count: int + last_modified: NotRequired[str] + + +class SessionListItemDict(TypedDict): + id: str + path: str + size_bytes: int + modified: float + + +class ProjectSessionRowDict(SessionListItemDict, total=False): + """Session row returned by GET /api/projects//sessions.""" + + title: str + models: list[str] + tokens: int + tool_calls: int + first_timestamp: str | None + last_timestamp: str | None + error: bool diff --git a/models/search.py b/models/search.py new file mode 100644 index 0000000..a97fcc6 --- /dev/null +++ b/models/search.py @@ -0,0 +1,12 @@ +"""Search API response shapes.""" + +from typing import TypedDict + + +class SearchHitDict(TypedDict): + project: str + session_id: str + title: str + role: str + timestamp: str | None + snippet: str diff --git a/models/session.py b/models/session.py new file mode 100644 index 0000000..85a0791 --- /dev/null +++ b/models/session.py @@ -0,0 +1,74 @@ +"""Parsed session shapes from jsonl_parser.""" + +from typing import Any, NotRequired, TypedDict + + +class MessageDict(TypedDict): + role: str + uuid: NotRequired[str | None] + parent_uuid: NotRequired[str | None] + timestamp: NotRequired[str | None] + text: NotRequired[str] + content: NotRequired[str] + images: NotRequired[list[Any] | None] + is_sidechain: NotRequired[bool] + tool_result: NotRequired[Any] + tool_result_parsed: NotRequired[dict[str, Any] | None] + slug: NotRequired[str | None] + model: NotRequired[str] + stop_reason: NotRequired[str] + thinking: NotRequired[str | None] + tool_uses: NotRequired[list[dict[str, Any]] | None] + is_api_error: NotRequired[bool] + usage: NotRequired[dict[str, Any]] + subtype: NotRequired[str] + level: NotRequired[str] + data: NotRequired[Any] + progress_type: NotRequired[str] + tool_use_id: NotRequired[str | None] + parent_tool_use_id: NotRequired[str | None] + + +class SessionMetadataDict(TypedDict, total=False): + session_id: str + models_used: list[str] + total_input_tokens: int + total_output_tokens: int + total_cache_read_tokens: int + total_cache_creation_tokens: int + total_tool_calls: int + tool_call_counts: dict[str, int] + first_timestamp: str | None + last_timestamp: str | None + version: str | None + cwd: str | None + git_branch: str | None + permission_mode: str | None + compactions: int + total_ephemeral_5m_tokens: int + total_ephemeral_1h_tokens: int + service_tiers: list[str] + session_wall_time_seconds: float | None + compact_boundaries: list[dict[str, Any]] + api_errors: int + files_read: list[str] + files_written: list[str] + files_created: list[str] + bash_commands: list[Any] + web_fetches: list[Any] + sidechain_messages: int + stop_reasons: dict[str, int] + entry_counts: dict[str, int] + + +class SessionDict(TypedDict): + session_id: str + title: str + messages: list[MessageDict] + metadata: SessionMetadataDict + + +class QuickSessionInfoDict(TypedDict): + title: str + first_timestamp: str | None + last_timestamp: str | None diff --git a/models/stats.py b/models/stats.py new file mode 100644 index 0000000..f59209b --- /dev/null +++ b/models/stats.py @@ -0,0 +1,26 @@ +"""Session statistics shapes from session_stats.""" + +from typing import Any, TypedDict + + +class FilesTouchedDict(TypedDict): + read: list[str] + written: list[str] + created: list[str] + total_unique: int + + +class SessionStatsDict(TypedDict): + files_touched: FilesTouchedDict + commands_run: list[dict[str, Any]] + urls_accessed: list[Any] + conversation_turns: int + wall_clock_seconds: float | None + wall_clock_display: str | None + cost_estimate_usd: float | None + tool_result_summary: dict[str, int] + stop_reason_summary: dict[str, int] + entry_type_counts: dict[str, int] + sidechain_message_count: int + api_error_count: int + compaction_events: list[Any] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..691b099 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[tool.mypy] +python_version = "3.12" +strict = true +packages = ["api", "utils", "models"] +exclude = ["tests/"] + +[[tool.mypy.overrides]] +module = "tests.*" +ignore_errors = true diff --git a/requirements-dev.txt b/requirements-dev.txt index bdf88e0..970621b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,2 +1,4 @@ -r requirements.txt pytest==9.0.2 +mypy==1.15.0 +types-Flask==1.1.6 diff --git a/tests/test_search.py b/tests/test_search.py new file mode 100644 index 0000000..999dcc8 --- /dev/null +++ b/tests/test_search.py @@ -0,0 +1,75 @@ +"""Tests for GET /api/search — query validation and limit parameter.""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest + +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + +from flask import Flask # noqa: E402 + +from api.search import search_bp # noqa: E402 + + +@pytest.fixture +def client(tmp_path): + app = Flask(__name__) + app.config["TESTING"] = True + app.config["CLAUDE_PROJECTS_DIR"] = str(tmp_path) + app.config["EXCLUSION_RULES"] = [] + app.register_blueprint(search_bp) + return app.test_client() + + +def _write_searchable_session(tmp_path: Path, project: str, session_id: str, text: str): + """Minimal user message line so substring search can match.""" + proj = tmp_path / project + proj.mkdir(exist_ok=True) + entry = { + "type": "user", + "timestamp": "2026-05-19T12:00:00Z", + "message": {"role": "user", "content": text}, + } + (proj / f"{session_id}.jsonl").write_text( + json.dumps(entry) + "\n", encoding="utf-8" + ) + + +class TestSearchLimitValidation: + def test_limit_integer_string(self, client, tmp_path): + _write_searchable_session(tmp_path, "proj-a", "sess-1", "hello searchable world") + resp = client.get("/api/search?q=searchable&limit=10") + assert resp.status_code == 200 + data = resp.get_json() + assert isinstance(data, list) + assert len(data) >= 1 + + def test_limit_float_string_returns_400(self, client): + resp = client.get("/api/search?q=test&limit=1.5") + assert resp.status_code == 400 + body = resp.get_json() + assert "error" in body + assert "limit" in body["error"].lower() + + def test_limit_non_numeric_returns_400(self, client): + resp = client.get("/api/search?q=test&limit=abc") + assert resp.status_code == 400 + body = resp.get_json() + assert "error" in body + assert "limit" in body["error"].lower() + + def test_limit_default_when_omitted(self, client, tmp_path): + _write_searchable_session(tmp_path, "proj-a", "sess-1", "findme keyword here") + resp = client.get("/api/search?q=findme") + assert resp.status_code == 200 + assert isinstance(resp.get_json(), list) + + def test_empty_query_returns_empty_list(self, client): + resp = client.get("/api/search?q=") + assert resp.status_code == 200 + assert resp.get_json() == [] diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 9f27bb9..5846c5c 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -28,6 +28,9 @@ import os import re from pathlib import Path +from typing import Any + +from models.session import SessionDict _logger = logging.getLogger(__name__) @@ -64,14 +67,14 @@ def resolve_exclusion_rules_path(cli_path: str | None) -> str | None: return None -def _tokenize_rule(line: str) -> list: +def _tokenize_rule(line: str) -> list[Any]: """ Tokenize a rule line into terms and operators. Returns a list where each element is ``"AND"``, ``"OR"``, or a ``(kind, value)`` tuple (kind is ``"word"`` or ``"phrase"``). """ - tokens = [] + tokens: list[Any] = [] rest = line.strip() while rest: m = re.match(r"\s+", rest) @@ -103,7 +106,7 @@ def _tokenize_rule(line: str) -> list: return tokens -def _term_matches(term: tuple, text: str) -> bool: +def _term_matches(term: tuple[str, str], text: str) -> bool: """Case-insensitive substring match for a single term.""" _kind, value = term if not value: @@ -111,7 +114,7 @@ def _term_matches(term: tuple, text: str) -> bool: return value.lower() in text.lower() -def _rule_matches(tokens: list, text: str) -> bool: +def _rule_matches(tokens: list[Any], text: str) -> bool: """ Evaluate a tokenized rule against *text*. @@ -120,8 +123,8 @@ def _rule_matches(tokens: list, text: str) -> bool: """ if not tokens: return False - clauses: list[list] = [] - current: list = [] + clauses: list[list[Any]] = [] + current: list[Any] = [] for t in tokens: if t == "OR": if current: @@ -143,7 +146,7 @@ def _rule_matches(tokens: list, text: str) -> bool: return False -def load_rules(path: str | None) -> list[list]: +def load_rules(path: str | None) -> list[list[Any]]: """ Load and parse the exclusion rule file at *path*. @@ -173,7 +176,7 @@ def load_rules(path: str | None) -> list[list]: return rules -def is_excluded_by_rules(rules: list[list], searchable_text: str) -> bool: +def is_excluded_by_rules(rules: list[list[Any]], searchable_text: str) -> bool: """ Return ``True`` if *searchable_text* matches any exclusion rule. @@ -211,7 +214,7 @@ def build_searchable_text( return "\n".join(p for p in parts if p) -def session_text_for_exclusion(session: dict) -> str: +def session_text_for_exclusion(session: SessionDict) -> str: """Extract a plain-text snippet from session messages for exclusion matching. Joins all non-empty, non-whitespace message ``text`` fields with blank @@ -229,8 +232,8 @@ def session_text_for_exclusion(session: dict) -> str: def is_session_excluded( - rules: list[list], - session: dict, + rules: list[list[Any]], + session: SessionDict, project_name: str | None, ) -> bool: """High-level helper: evaluate exclusion rules against a parsed session. diff --git a/utils/export_day_filter.py b/utils/export_day_filter.py index a616088..9916723 100644 --- a/utils/export_day_filter.py +++ b/utils/export_day_filter.py @@ -4,6 +4,10 @@ import logging from datetime import date, datetime, timezone +from typing import Any, Callable + +from models.project import ProjectDict, SessionListItemDict +from models.session import SessionDict logger = logging.getLogger(__name__) @@ -56,14 +60,18 @@ def day_overlaps_session(start: date, end: date, day: date) -> bool: def collect_sessions_for_latest_activity_day( - projects: list[dict], + projects: list[ProjectDict], *, - list_sessions, - parse_session, - is_session_excluded, - rules, + list_sessions: Callable[[str], list[SessionListItemDict]], + parse_session: Callable[[str], SessionDict], + is_session_excluded: Callable[..., bool], + rules: list[list[Any]], abort_on_parse_error: bool = False, -) -> tuple[date | None, list[tuple[dict, dict, dict, date, date]], int]: +) -> tuple[ + date | None, + list[tuple[ProjectDict, SessionListItemDict, SessionDict, date, date]], + int, +]: """Parse sessions in *projects*, skip untitled/excluded, return (D, rows, n_scanned). *D* is the latest session **end** calendar date (UTC) from successfully @@ -74,7 +82,7 @@ def collect_sessions_for_latest_activity_day( Each row is ``(project, sess_info, session, start_date, end_date)`` for sessions that overlap *D*. *n_scanned* counts every ``.jsonl`` file visited. """ - parsed: list[tuple[dict, dict, dict, date, date]] = [] + parsed: list[tuple[ProjectDict, SessionListItemDict, SessionDict, date, date]] = [] total_scan = 0 for project in projects: for sess_info in list_sessions(project["path"]): diff --git a/utils/export_state_store.py b/utils/export_state_store.py index 4040340..1fadc23 100644 --- a/utils/export_state_store.py +++ b/utils/export_state_store.py @@ -6,15 +6,23 @@ import os import tempfile import threading +from collections.abc import Iterator from contextlib import contextmanager +from typing import Any, cast +from models.export import ExportStateDict + +fcntl: Any try: - import fcntl + import fcntl as _fcntl_mod + fcntl = _fcntl_mod except ImportError: fcntl = None +msvcrt: Any try: - import msvcrt + import msvcrt as _msvcrt_mod + msvcrt = _msvcrt_mod except ImportError: msvcrt = None @@ -35,7 +43,7 @@ def _fallback_lock_for(path: str) -> threading.Lock: @contextmanager -def export_state_lock(state_path: str | None = None): +def export_state_lock(state_path: str | None = None) -> Iterator[None]: """Serialize export_state.json reads/writes across processes. POSIX: ``flock`` on a sidecar ``*.lock`` file. Windows: ``msvcrt.locking`` on @@ -63,10 +71,10 @@ def export_state_lock(state_path: str | None = None): if not os.path.exists(lock_path): with open(lock_path, "wb") as f: f.write(b"\x00") - lock_fp = open(lock_path, "r+b") + lock_fp = open(lock_path, "r+b") # type: ignore[assignment] try: if os.path.getsize(lock_path) == 0: - lock_fp.write(b"\x00") + lock_fp.write(b"\x00") # type: ignore[arg-type] lock_fp.flush() lock_fp.seek(0) msvcrt.locking(lock_fp.fileno(), msvcrt.LK_LOCK, 1) @@ -82,7 +90,7 @@ def export_state_lock(state_path: str | None = None): yield -def load_export_state_from_disk(state_path: str | None = None) -> dict: +def load_export_state_from_disk(state_path: str | None = None) -> ExportStateDict: """Load state from disk (call under :func:`export_state_lock` for consistency). Migrates legacy flat ``{session_id: mtime, ...}`` to ``{"sessions": ...}``. @@ -104,10 +112,12 @@ def load_export_state_from_disk(state_path: str | None = None) -> dict: if not isinstance(data.get("sessions"), dict): data = dict(data) data["sessions"] = {} - return data + return cast(ExportStateDict, data) -def atomic_write_export_state(state: dict, state_path: str | None = None) -> None: +def atomic_write_export_state( + state: ExportStateDict, state_path: str | None = None +) -> None: """Write *state* atomically (serialize, temp file + fsync + replace). Call under :func:`export_state_lock` matching *state_path*. diff --git a/utils/json_exporter.py b/utils/json_exporter.py index 84f6163..3439732 100644 --- a/utils/json_exporter.py +++ b/utils/json_exporter.py @@ -3,9 +3,17 @@ import json from datetime import datetime, timezone +from typing import Any +from models.session import SessionDict, SessionMetadataDict +from models.stats import SessionStatsDict -def session_to_json(session: dict, stats: dict = None, indent: int = 2) -> str: + +def session_to_json( + session: SessionDict, + stats: SessionStatsDict | None = None, + indent: int = 2, +) -> str: """Serialize a parsed session to a JSON string with schema versioning. Pass indent=None if you want compact output for piping.""" output = { @@ -20,9 +28,9 @@ def session_to_json(session: dict, stats: dict = None, indent: int = 2) -> str: return json.dumps(output, indent=indent, default=str, ensure_ascii=False) -def _serialize_metadata(meta: dict) -> dict: +def _serialize_metadata(meta: SessionMetadataDict) -> dict[str, Any]: """json.dumps chokes on sets, so convert them to sorted lists.""" - result = {} + result: dict[str, Any] = {} for key, val in meta.items(): if isinstance(val, set): result[key] = sorted(val) @@ -31,7 +39,7 @@ def _serialize_metadata(meta: dict) -> dict: return result -def _serialize_messages(messages: list) -> list: +def _serialize_messages(messages: list[Any]) -> list[dict[str, Any]]: """Same set-to-list cleanup, but for each message dict.""" out = [] for msg in messages: diff --git a/utils/jsonl_parser.py b/utils/jsonl_parser.py index cccc05d..51ce8d9 100644 --- a/utils/jsonl_parser.py +++ b/utils/jsonl_parser.py @@ -4,15 +4,18 @@ import json import os from datetime import datetime +from typing import Any, cast +from models.session import MessageDict, QuickSessionInfoDict, SessionDict -def parse_session(filepath: str) -> dict: + +def parse_session(filepath: str) -> SessionDict: """Main entry point. Reads every line from a .jsonl file and builds up a session dict with messages, metadata (tokens, models, tool counts), and file/command activity.""" session_id = os.path.basename(filepath).replace(".jsonl", "") - messages = [] - metadata = { + messages: list[MessageDict] = [] + metadata: dict[str, Any] = { "session_id": session_id, "models_used": set(), "total_input_tokens": 0, @@ -117,20 +120,25 @@ def parse_session(filepath: str) -> dict: title = _infer_title(messages) - return { - "session_id": session_id, - "title": title, - "messages": messages, - "metadata": metadata, - } + return cast( + SessionDict, + { + "session_id": session_id, + "title": title, + "messages": messages, + "metadata": metadata, + }, + ) -def _entry_message(entry: dict) -> dict: +def _entry_message(entry: dict[str, Any]) -> dict[str, Any]: m = entry.get("message") return m if isinstance(m, dict) else {} -def _process_user(entry: dict, messages: list, metadata: dict): +def _process_user( + entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] +) -> None: """Pull out text, tool results, and session-level metadata (cwd, version, etc.) from a user entry.""" if metadata["version"] is None: @@ -172,7 +180,9 @@ def _process_user(entry: dict, messages: list, metadata: dict): }) -def _process_assistant(entry: dict, messages: list, metadata: dict): +def _process_assistant( + entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] +) -> None: """Handle assistant responses -- splits content into text, thinking blocks, and tool_use calls, and accumulates token/model/tool stats.""" msg = _entry_message(entry) @@ -265,7 +275,9 @@ def _process_assistant(entry: dict, messages: list, metadata: dict): }) -def _process_system(entry: dict, messages: list, metadata: dict): +def _process_system( + entry: dict[str, Any], messages: list[MessageDict], metadata: dict[str, Any] +) -> None: """Handle system entries (mostly compact_boundary markers from context compaction).""" subtype = entry.get("subtype", "") @@ -290,7 +302,7 @@ def _process_system(entry: dict, messages: list, metadata: dict): }) -def _process_progress(entry: dict, messages: list): +def _process_progress(entry: dict[str, Any], messages: list[MessageDict]) -> None: """Capture progress entries -- streaming bash output, hook results, etc. These are noisy so we mostly just store them for the JSON export.""" data = entry.get("data", {}) @@ -309,7 +321,9 @@ def _process_progress(entry: dict, messages: list): }) -def _track_file_activity(tool_name: str, tool_input: dict, metadata: dict): +def _track_file_activity( + tool_name: str, tool_input: dict[str, Any], metadata: dict[str, Any] +) -> None: """Look at what each tool call did and record which files got touched, what commands got run, what URLs got fetched.""" fp = tool_input.get("file_path", "") @@ -329,11 +343,11 @@ def _track_file_activity(tool_name: str, tool_input: dict, metadata: dict): metadata["web_fetches"].append(url_or_query) -def _tool_result_pred_bash(tr: dict) -> bool: +def _tool_result_pred_bash(tr: dict[str, Any]) -> bool: return "stdout" in tr or "stderr" in tr -def _tool_result_build_bash(tr: dict, base: dict) -> dict: +def _tool_result_build_bash(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "bash" result["stdout"] = tr.get("stdout", "") @@ -345,13 +359,13 @@ def _tool_result_build_bash(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_file_edit(tr: dict) -> bool: +def _tool_result_pred_file_edit(tr: dict[str, Any]) -> bool: return "structuredPatch" in tr or ( "filePath" in tr and "newString" in tr ) -def _tool_result_build_file_edit(tr: dict, base: dict) -> dict: +def _tool_result_build_file_edit(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "file_edit" result["file_path"] = tr.get("filePath", "") @@ -359,22 +373,22 @@ def _tool_result_build_file_edit(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_file_write(tr: dict) -> bool: +def _tool_result_pred_file_write(tr: dict[str, Any]) -> bool: return "filePath" in tr and "content" in tr -def _tool_result_build_file_write(tr: dict, base: dict) -> dict: +def _tool_result_build_file_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "file_write" result["file_path"] = tr.get("filePath", "") return result -def _tool_result_pred_glob(tr: dict) -> bool: +def _tool_result_pred_glob(tr: dict[str, Any]) -> bool: return "filenames" in tr and isinstance(tr.get("filenames"), list) -def _tool_result_build_glob(tr: dict, base: dict) -> dict: +def _tool_result_build_glob(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) filenames = tr["filenames"] result["result_type"] = "glob" @@ -385,11 +399,11 @@ def _tool_result_build_glob(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_grep(tr: dict) -> bool: +def _tool_result_pred_grep(tr: dict[str, Any]) -> bool: return "mode" in tr and "numFiles" in tr -def _tool_result_build_grep(tr: dict, base: dict) -> dict: +def _tool_result_build_grep(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "grep" result["mode"] = tr.get("mode") @@ -402,11 +416,11 @@ def _tool_result_build_grep(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_file_read(tr: dict) -> bool: +def _tool_result_pred_file_read(tr: dict[str, Any]) -> bool: return "file" in tr and isinstance(tr["file"], dict) -def _tool_result_build_file_read(tr: dict, base: dict) -> dict: +def _tool_result_build_file_read(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) file_obj = tr["file"] result["result_type"] = "file_read" @@ -418,11 +432,11 @@ def _tool_result_build_file_read(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_web_search(tr: dict) -> bool: +def _tool_result_pred_web_search(tr: dict[str, Any]) -> bool: return "query" in tr and "results" in tr -def _tool_result_build_web_search(tr: dict, base: dict) -> dict: +def _tool_result_build_web_search(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "web_search" result["query"] = tr.get("query", "") @@ -437,11 +451,11 @@ def _tool_result_build_web_search(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_web_fetch(tr: dict) -> bool: +def _tool_result_pred_web_fetch(tr: dict[str, Any]) -> bool: return "url" in tr and "code" in tr -def _tool_result_build_web_fetch(tr: dict, base: dict) -> dict: +def _tool_result_build_web_fetch(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "web_fetch" result["url"] = tr.get("url", "") @@ -450,7 +464,7 @@ def _tool_result_build_web_fetch(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_task_message(tr: dict) -> bool: +def _tool_result_pred_task_message(tr: dict[str, Any]) -> bool: # Broad: matches ``task_id`` OR ``message``. Runs before retrieval/completed/async # arms below — same short-circuit order as the original if/elif chain. Payloads # that also carry e.g. ``agentId`` still classify here if they have ``message``. @@ -458,7 +472,7 @@ def _tool_result_pred_task_message(tr: dict) -> bool: return "task_id" in tr or "message" in tr -def _tool_result_build_task_message(tr: dict, base: dict) -> dict: +def _tool_result_build_task_message(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "task" result["task_id"] = tr.get("task_id") @@ -466,11 +480,11 @@ def _tool_result_build_task_message(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_task_retrieval(tr: dict) -> bool: +def _tool_result_pred_task_retrieval(tr: dict[str, Any]) -> bool: return "retrieval_status" in tr and "task" in tr -def _tool_result_build_task_retrieval(tr: dict, base: dict) -> dict: +def _tool_result_build_task_retrieval(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) task_obj = tr["task"] if isinstance(tr["task"], dict) else {} result["result_type"] = "task" @@ -479,11 +493,11 @@ def _tool_result_build_task_retrieval(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_task_completed(tr: dict) -> bool: +def _tool_result_pred_task_completed(tr: dict[str, Any]) -> bool: return "agentId" in tr and "totalDurationMs" in tr -def _tool_result_build_task_completed(tr: dict, base: dict) -> dict: +def _tool_result_build_task_completed(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "task" result["agent_id"] = tr.get("agentId") @@ -494,11 +508,11 @@ def _tool_result_build_task_completed(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_task_async(tr: dict) -> bool: +def _tool_result_pred_task_async(tr: dict[str, Any]) -> bool: return "agentId" in tr and "isAsync" in tr -def _tool_result_build_task_async(tr: dict, base: dict) -> dict: +def _tool_result_build_task_async(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "task" result["agent_id"] = tr.get("agentId") @@ -507,11 +521,11 @@ def _tool_result_build_task_async(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_todo_write(tr: dict) -> bool: +def _tool_result_pred_todo_write(tr: dict[str, Any]) -> bool: return "newTodos" in tr or "oldTodos" in tr -def _tool_result_build_todo_write(tr: dict, base: dict) -> dict: +def _tool_result_build_todo_write(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) new_todos = tr.get("newTodos", []) result["result_type"] = "todo_write" @@ -520,11 +534,11 @@ def _tool_result_build_todo_write(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_user_input(tr: dict) -> bool: +def _tool_result_pred_user_input(tr: dict[str, Any]) -> bool: return "questions" in tr and "answers" in tr -def _tool_result_build_user_input(tr: dict, base: dict) -> dict: +def _tool_result_build_user_input(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "user_input" result["questions"] = tr.get("questions", []) @@ -532,11 +546,11 @@ def _tool_result_build_user_input(tr: dict, base: dict) -> dict: return result -def _tool_result_pred_plan(tr: dict) -> bool: +def _tool_result_pred_plan(tr: dict[str, Any]) -> bool: return "plan" in tr and "filePath" in tr -def _tool_result_build_plan(tr: dict, base: dict) -> dict: +def _tool_result_build_plan(tr: dict[str, Any], base: dict[str, Any]) -> dict[str, Any]: result = dict(base) result["result_type"] = "plan" result["file_path"] = tr.get("filePath", "") @@ -572,7 +586,9 @@ def _tool_result_build_plan(tr: dict, base: dict) -> dict: ) -def _parse_tool_result(tool_result, slug: str | None = None) -> dict | None: +def _parse_tool_result( + tool_result: Any, slug: str | None = None +) -> dict[str, Any] | None: """Figure out what kind of tool result this is (bash, file edit, glob, etc.) by looking at which keys are present, since the JSONL doesn't always tag them. @@ -596,7 +612,7 @@ def _parse_tool_result(tool_result, slug: str | None = None) -> dict | None: return result -def quick_session_info(filepath: str) -> dict: +def quick_session_info(filepath: str) -> QuickSessionInfoDict: """Lightweight peek at a session file -- returns title and last_timestamp without fully parsing all messages. Much faster than parse_session() for large files. @@ -666,7 +682,7 @@ def quick_session_info(filepath: str) -> dict: } -def _normalize_content(content) -> list: +def _normalize_content(content: Any) -> list[dict[str, Any]]: """Content can be a plain string, a list of strings, or a list of typed blocks. Normalize everything into [{type, text}, ...] form.""" if isinstance(content, str): @@ -682,7 +698,7 @@ def _normalize_content(content) -> list: return [] -def _extract_text(content_parts) -> str: +def _extract_text(content_parts: Any) -> str: """Grab just the text blocks out of a content array, ignore tool_use/thinking.""" parts = _normalize_content(content_parts) texts = [] @@ -692,7 +708,7 @@ def _extract_text(content_parts) -> str: return "\n".join(texts) -def _extract_images(content_parts) -> list: +def _extract_images(content_parts: Any) -> list[dict[str, Any]]: """Pull base64 image blocks out of a content array. Also looks inside nested tool_result content blocks.""" parts = _normalize_content(content_parts) @@ -719,7 +735,7 @@ def _extract_images(content_parts) -> list: return images -def _infer_title(messages: list) -> str: +def _infer_title(messages: list[MessageDict]) -> str: """Use the first line of the first real user message as the session title.""" for msg in messages: if msg["role"] == "user" and msg.get("text"): diff --git a/utils/md_exporter.py b/utils/md_exporter.py index 4e66c88..55d777f 100644 --- a/utils/md_exporter.py +++ b/utils/md_exporter.py @@ -3,8 +3,13 @@ from datetime import datetime +from typing import Any -def session_to_markdown(session: dict, stats: dict = None) -> str: +from models.session import MessageDict, SessionDict +from models.stats import SessionStatsDict + + +def session_to_markdown(session: SessionDict, stats: SessionStatsDict | None = None) -> str: """Glue together frontmatter + header + summary + conversation body.""" frontmatter = _build_frontmatter(session) header = _build_header(session) @@ -18,7 +23,7 @@ def session_to_markdown(session: dict, stats: dict = None) -> str: return "\n".join(parts) -def _build_frontmatter(session: dict) -> str: +def _build_frontmatter(session: SessionDict) -> str: meta = session["metadata"] lines = ["---"] lines.append(f"title: \"{_escape_yaml(session['title'])}\"") @@ -82,7 +87,7 @@ def _build_frontmatter(session: dict) -> str: return "\n".join(lines) -def _build_header(session: dict) -> str: +def _build_header(session: SessionDict) -> str: meta = session["metadata"] lines = [] lines.append(f"\n# {session['title']}\n") @@ -111,7 +116,7 @@ def _build_header(session: dict) -> str: return "\n".join(lines) -def _build_summary(session: dict, stats: dict) -> str: +def _build_summary(session: SessionDict, stats: SessionStatsDict) -> str: """The summary block that goes right after the header -- cost, files table, command list, URLs, tool result breakdown.""" lines = ["## Session Summary\n"] @@ -177,7 +182,7 @@ def _build_summary(session: dict, stats: dict) -> str: return "\n".join(lines) -def _build_body(messages: list) -> str: +def _build_body(messages: list[MessageDict]) -> str: parts = [] for msg in messages: role = msg["role"] @@ -191,7 +196,7 @@ def _build_body(messages: list) -> str: return "\n".join(parts) -def _render_user(msg: dict) -> str: +def _render_user(msg: MessageDict) -> str: lines = [] lines.append("### User\n") @@ -201,9 +206,11 @@ def _render_user(msg: dict) -> str: if msg.get("slug"): lines.append(f"_Tool response: {msg['slug']}_\n") - if msg.get("images"): - for img in msg["images"]: - lines.append(f'User image\n') + for img in msg.get("images") or []: + lines.append( + f'\n' + ) if msg.get("text"): from utils.jsonl_parser import _strip_system_tags @@ -223,7 +230,7 @@ def _render_user(msg: dict) -> str: return "\n".join(lines) -def _render_assistant(msg: dict) -> str: +def _render_assistant(msg: MessageDict) -> str: lines = [] lines.append("### Assistant\n") @@ -245,24 +252,24 @@ def _render_assistant(msg: dict) -> str: if msg.get("is_api_error"): lines.append("**[API Error]**\n") - if msg.get("thinking"): + thinking = msg.get("thinking") + if thinking: lines.append("
Thinking\n") - lines.append(msg["thinking"]) + lines.append(thinking) lines.append("\n
\n") if msg.get("text"): from utils.jsonl_parser import _strip_system_tags lines.append(_strip_system_tags(msg["text"])) - if msg.get("tool_uses"): - for tool in msg["tool_uses"]: + for tool in msg.get("tool_uses") or []: lines.append(_render_tool_use(tool)) lines.append("\n---\n") return "\n".join(lines) -def _render_tool_use(tool: dict) -> str: +def _render_tool_use(tool: dict[str, Any]) -> str: name = tool.get("name", "unknown") inp = tool.get("input", {}) lines = [] @@ -320,7 +327,7 @@ def _render_tool_use(tool: dict) -> str: return "\n".join(lines) -def _render_tool_result(parsed: dict) -> str: +def _render_tool_result(parsed: dict[str, Any]) -> str: """Format a tool result nicely instead of dumping raw JSON.""" rt = parsed.get("result_type", "unknown") lines = [] @@ -423,7 +430,7 @@ def _render_tool_result(parsed: dict) -> str: return "\n".join(lines) -def _render_system(msg: dict) -> str: +def _render_system(msg: MessageDict) -> str: lines = [] subtype = msg.get("subtype", "") content = msg.get("content", "") @@ -436,8 +443,10 @@ def _render_system(msg: dict) -> str: return "\n".join(lines) -def _format_ts(ts: str) -> str: +def _format_ts(ts: str | None) -> str: """2024-01-15T10:30:00.123Z -> 2024-01-15 10:30:00.123 UTC""" + if not ts: + return "" try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) ms = dt.microsecond // 1000 diff --git a/utils/session_path.py b/utils/session_path.py index 96ea4cf..4705e68 100644 --- a/utils/session_path.py +++ b/utils/session_path.py @@ -4,6 +4,8 @@ import os import platform +from models.project import ProjectDict, SessionListItemDict + def safe_join(base: str, *parts: str) -> str: """Join path components and verify the result stays under base. @@ -25,13 +27,13 @@ def get_claude_projects_dir() -> str: return os.path.join(home, ".claude", "projects") -def list_projects(base_dir: str | None = None) -> list[dict]: +def list_projects(base_dir: str | None = None) -> list[ProjectDict]: """Scan the projects dir and return info for each one that has .jsonl files.""" base = base_dir or get_claude_projects_dir() if not os.path.isdir(base): return [] - projects = [] + projects: list[ProjectDict] = [] for name in sorted(os.listdir(base)): project_dir = os.path.join(base, name) if not os.path.isdir(project_dir): @@ -53,7 +55,7 @@ def list_projects(base_dir: str | None = None) -> list[dict]: display_name = None for jf in jsonl_files: display_name = _get_display_name( - os.path.join(project_dir, jf), None + os.path.join(project_dir, jf), name ) if display_name: break @@ -69,7 +71,7 @@ def list_projects(base_dir: str | None = None) -> list[dict]: return projects -def _get_display_name(jsonl_path: str, fallback: str) -> str: +def _get_display_name(jsonl_path: str, fallback: str | None) -> str: """Peek at the first entry's cwd field to get a human-readable project path instead of the hashed directory name.""" import json @@ -86,15 +88,16 @@ def _get_display_name(jsonl_path: str, fallback: str) -> str: cwd = cwd.replace("\\", "/").rstrip("/") # Extract last folder name and capitalize first letter folder = cwd.rsplit("/", 1)[-1] - return folder[:1].upper() + folder[1:] if folder else cwd + out = folder[:1].upper() + folder[1:] if folder else cwd + return str(out) except Exception: pass - return fallback + return fallback or "" -def list_sessions(project_dir: str) -> list[dict]: +def list_sessions(project_dir: str) -> list[SessionListItemDict]: """Return id, path, size, mtime for each .jsonl file in a project dir.""" - sessions = [] + sessions: list[SessionListItemDict] = [] if not os.path.isdir(project_dir): return sessions diff --git a/utils/session_stats.py b/utils/session_stats.py index 2ce0ba7..3067ce9 100644 --- a/utils/session_stats.py +++ b/utils/session_stats.py @@ -2,6 +2,11 @@ activity, command success rates, conversation turns, etc. Bridges the raw parser output to the exporters.""" +from typing import Any, cast + +from models.session import MessageDict, SessionDict, SessionMetadataDict +from models.stats import FilesTouchedDict, SessionStatsDict + # Approximate pricing per 1M tokens (USD) as of early 2026. # Used for best-effort cost estimation only. _MODEL_PRICING = { @@ -12,7 +17,7 @@ } -def compute_stats(session: dict) -> dict: +def compute_stats(session: SessionDict) -> SessionStatsDict: """Build the full stats dict for a session. Everything the exporters and API endpoints need -- file lists, command history, cost, turn count.""" meta = session["metadata"] @@ -35,10 +40,10 @@ def compute_stats(session: dict) -> dict: "api_error_count": meta.get("api_errors", 0), "compaction_events": meta.get("compact_boundaries", []), } - return stats + return cast(SessionStatsDict, stats) -def _compute_files_touched(meta: dict) -> dict: +def _compute_files_touched(meta: SessionMetadataDict) -> FilesTouchedDict: """Split files into read-only, edited, and newly created buckets. Files that were both read and edited only show up under edited.""" read = set(meta.get("files_read", [])) @@ -53,15 +58,16 @@ def _compute_files_touched(meta: dict) -> dict: } -def _compute_commands_run(messages: list) -> list: +def _compute_commands_run(messages: list[MessageDict]) -> list[dict[str, Any]]: """Walk through messages and match up Bash tool_use calls with their subsequent tool_result entries to get exit codes and error status.""" commands = [] # Build a map of tool_use_id -> command from assistant messages pending_commands = {} for msg in messages: - if msg["role"] == "assistant" and msg.get("tool_uses"): - for tu in msg["tool_uses"]: + tool_uses = msg.get("tool_uses") or [] + if msg["role"] == "assistant" and tool_uses: + for tu in tool_uses: if tu["name"] == "Bash": cmd = tu["input"].get("command", "") if cmd: @@ -71,9 +77,8 @@ def _compute_commands_run(messages: list) -> list: } # Match tool results back to commands - if msg["role"] == "user" and msg.get("tool_result_parsed"): - trp = msg["tool_result_parsed"] - if trp.get("result_type") == "bash": + trp = msg.get("tool_result_parsed") + if msg["role"] == "user" and trp and trp.get("result_type") == "bash": # Try to find matching command by sequential order if pending_commands: first_id = next(iter(pending_commands)) @@ -95,7 +100,7 @@ def _compute_commands_run(messages: list) -> list: return commands -def _count_turns(messages: list) -> int: +def _count_turns(messages: list[MessageDict]) -> int: """Count how many times the user said something and got a reply back.""" turns = 0 prev_role = None @@ -108,7 +113,7 @@ def _count_turns(messages: list) -> int: return turns -def _estimate_cost(messages: list, meta: dict) -> float | None: +def _estimate_cost(messages: list[MessageDict], meta: SessionMetadataDict) -> float | None: """Rough cost estimate based on each message's token count and the model that generated it. Not exact -- doesn't account for caching discounts.""" total = 0.0 @@ -133,7 +138,7 @@ def _estimate_cost(messages: list, meta: dict) -> float | None: return round(total, 4) if has_data else None -def _get_pricing(model: str) -> tuple | None: +def _get_pricing(model: str) -> tuple[float, float] | None: """Find pricing by checking if 'opus', 'sonnet', or 'haiku' appears in the model name. Returns None for unknown models.""" model_lower = model.lower() @@ -143,7 +148,7 @@ def _get_pricing(model: str) -> tuple | None: return None -def _summarize_tool_results(messages: list) -> dict: +def _summarize_tool_results(messages: list[MessageDict]) -> dict[str, int]: """Count up how many tool results succeeded, failed, or got interrupted, broken down by tool type.""" summary = { @@ -192,7 +197,7 @@ def _summarize_tool_results(messages: list) -> dict: return summary -def _format_duration(seconds) -> str | None: +def _format_duration(seconds: float | int | None) -> str | None: """Turn seconds into something like '2h 15m' or '45s'.""" if seconds is None: return None From a38b16293bedf5867ffd8b1891c17678d6102b20 Mon Sep 17 00:00:00 2001 From: star-med Date: Tue, 19 May 2026 07:24:34 +0800 Subject: [PATCH 2/8] Fix PR review: stats exclusion, export_session errors, mypy 2.1, CI cache --- .github/workflows/ci.yml | 4 +++- api/export_api.py | 48 ++++++++++++++++++++++------------------ api/sessions.py | 3 +++ pyproject.toml | 4 ---- requirements-dev.txt | 3 +-- utils/session_path.py | 9 ++++---- 6 files changed, 38 insertions(+), 33 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 39d869d..28e7931 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,7 +67,9 @@ jobs: with: python-version: "3.12" cache: pip - cache-dependency-path: requirements-dev.txt + cache-dependency-path: | + requirements.txt + requirements-dev.txt - name: Install dev dependencies run: pip install -r requirements-dev.txt diff --git a/api/export_api.py b/api/export_api.py index b108816..7fd6e08 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -277,30 +277,36 @@ def export_session(project_name: str, session_id: str) -> FlaskReturn: return json_ok({"error": "Session not found"}), 404 fmt = request.args.get("format", "md") - session = parse_session(filepath) - rules = current_app.config.get("EXCLUSION_RULES") or [] - if is_session_excluded(rules, session, project_name): - return json_ok({"error": "Session not found"}), 404 - stats = compute_stats(session) - title_slug = slugify(session["title"], default="session") + try: + session = parse_session(filepath) + rules = current_app.config.get("EXCLUSION_RULES") or [] + if is_session_excluded(rules, session, project_name): + return json_ok({"error": "Session not found"}), 404 + stats = compute_stats(session) + title_slug = slugify(session["title"], default="session") + + if fmt == "json": + content = session_to_json(session, stats) + buf = io.BytesIO(content.encode("utf-8")) + buf.seek(0) + return send_file( + buf, + mimetype="application/json", + as_attachment=True, + download_name=f"{title_slug}.json", # type: ignore[call-arg] + ) - if fmt == "json": - content = session_to_json(session, stats) - buf = io.BytesIO(content.encode("utf-8")) + md = session_to_markdown(session, stats) + buf = io.BytesIO(md.encode("utf-8")) buf.seek(0) return send_file( buf, - mimetype="application/json", + mimetype="text/markdown", as_attachment=True, - download_name=f"{title_slug}.json", # type: ignore[call-arg] + download_name=f"{title_slug}.md", # type: ignore[call-arg] ) - - md = session_to_markdown(session, stats) - buf = io.BytesIO(md.encode("utf-8")) - buf.seek(0) - return send_file( - buf, - mimetype="text/markdown", - as_attachment=True, - download_name=f"{title_slug}.md", # type: ignore[call-arg] - ) + except Exception: + current_app.logger.exception( + "Failed to export session %s/%s", project_name, session_id + ) + return json_ok({"error": "Internal server error exporting session"}), 500 diff --git a/api/sessions.py b/api/sessions.py index 514eaf2..4ce2458 100644 --- a/api/sessions.py +++ b/api/sessions.py @@ -54,6 +54,9 @@ def get_session_stats(project_name: str, session_id: str) -> FlaskReturn: try: session = parse_session(filepath) + rules = current_app.config.get("EXCLUSION_RULES") or [] + if is_session_excluded(rules, session, project_name): + return json_ok({"error": "Session not found"}), 404 stats = compute_stats(session) return json_ok(stats) except Exception: diff --git a/pyproject.toml b/pyproject.toml index 691b099..900c0e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,3 @@ python_version = "3.12" strict = true packages = ["api", "utils", "models"] exclude = ["tests/"] - -[[tool.mypy.overrides]] -module = "tests.*" -ignore_errors = true diff --git a/requirements-dev.txt b/requirements-dev.txt index 970621b..7b65b78 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,4 +1,3 @@ -r requirements.txt pytest==9.0.2 -mypy==1.15.0 -types-Flask==1.1.6 +mypy==2.1.0 diff --git a/utils/session_path.py b/utils/session_path.py index 4705e68..37d6248 100644 --- a/utils/session_path.py +++ b/utils/session_path.py @@ -52,15 +52,14 @@ def list_projects(base_dir: str | None = None) -> list[ProjectDict]: latest_mtime, tz=timezone.utc ).isoformat() # Read cwd from sessions to get the real project path - display_name = None + display_name = name for jf in jsonl_files: - display_name = _get_display_name( + candidate = _get_display_name( os.path.join(project_dir, jf), name ) - if display_name: + if candidate: + display_name = candidate break - if not display_name: - display_name = name projects.append({ "name": name, "path": project_dir, From d5153adcb3506204777e4d4c9aea2e2c372e9163 Mon Sep 17 00:00:00 2001 From: star-med Date: Tue, 19 May 2026 07:28:31 +0800 Subject: [PATCH 3/8] Fix mypy CI errors in json_ok and export_api send_file stubs --- api/_flask_types.py | 4 ++-- api/export_api.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/api/_flask_types.py b/api/_flask_types.py index 70d8b02..bfb0316 100644 --- a/api/_flask_types.py +++ b/api/_flask_types.py @@ -1,6 +1,6 @@ """Shared Flask handler return types for mypy.""" -from typing import Any, Union, cast +from typing import Any, Union from flask import Response, jsonify @@ -9,4 +9,4 @@ def json_ok(*args: Any, **kwargs: Any) -> Response: """Typed wrapper around :func:`flask.jsonify`.""" - return cast(Response, jsonify(*args, **kwargs)) + return jsonify(*args, **kwargs) diff --git a/api/export_api.py b/api/export_api.py index 7fd6e08..6410ff0 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -255,7 +255,7 @@ def bulk_export() -> FlaskReturn: buf, mimetype="application/zip", as_attachment=True, - download_name=f"claude-code-export{suffix}-{date_tag}.zip", # type: ignore[call-arg] + download_name=f"claude-code-export{suffix}-{date_tag}.zip", ) @@ -293,7 +293,7 @@ def export_session(project_name: str, session_id: str) -> FlaskReturn: buf, mimetype="application/json", as_attachment=True, - download_name=f"{title_slug}.json", # type: ignore[call-arg] + download_name=f"{title_slug}.json", ) md = session_to_markdown(session, stats) @@ -303,7 +303,7 @@ def export_session(project_name: str, session_id: str) -> FlaskReturn: buf, mimetype="text/markdown", as_attachment=True, - download_name=f"{title_slug}.md", # type: ignore[call-arg] + download_name=f"{title_slug}.md", ) except Exception: current_app.logger.exception( From 62e557cc4e2159798929e21783494b725874c39d Mon Sep 17 00:00:00 2001 From: star-med Date: Tue, 19 May 2026 07:35:05 +0800 Subject: [PATCH 4/8] fix(session_path): continue JSONL scan when display name peek fails --- utils/session_path.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/utils/session_path.py b/utils/session_path.py index 37d6248..4dfad96 100644 --- a/utils/session_path.py +++ b/utils/session_path.py @@ -1,11 +1,15 @@ """Finds where Claude Code stores its .jsonl session files on disk and lists projects/sessions from that directory.""" +import json +import logging import os import platform from models.project import ProjectDict, SessionListItemDict +_logger = logging.getLogger(__name__) + def safe_join(base: str, *parts: str) -> str: """Join path components and verify the result stays under base. @@ -55,9 +59,9 @@ def list_projects(base_dir: str | None = None) -> list[ProjectDict]: display_name = name for jf in jsonl_files: candidate = _get_display_name( - os.path.join(project_dir, jf), name + os.path.join(project_dir, jf), None ) - if candidate: + if candidate is not None: display_name = candidate break projects.append({ @@ -70,10 +74,9 @@ def list_projects(base_dir: str | None = None) -> list[ProjectDict]: return projects -def _get_display_name(jsonl_path: str, fallback: str | None) -> str: +def _get_display_name(jsonl_path: str, fallback: str | None) -> str | None: """Peek at the first entry's cwd field to get a human-readable project path instead of the hashed directory name.""" - import json try: with open(jsonl_path, "r", encoding="utf-8", errors="replace") as f: for line in f: @@ -89,9 +92,11 @@ def _get_display_name(jsonl_path: str, fallback: str | None) -> str: folder = cwd.rsplit("/", 1)[-1] out = folder[:1].upper() + folder[1:] if folder else cwd return str(out) - except Exception: - pass - return fallback or "" + except (OSError, json.JSONDecodeError, UnicodeDecodeError) as exc: + _logger.warning( + "Failed to extract display name from %s: %s", jsonl_path, exc + ) + return fallback def list_sessions(project_dir: str) -> list[SessionListItemDict]: From 46452dde22c24a3d7eeeaebc2621c7e7d690a823 Mon Sep 17 00:00:00 2001 From: chen Date: Thu, 21 May 2026 05:42:38 +0800 Subject: [PATCH 5/8] Fix: Single quote on Attribute --- static/js/search.js | 9 ++++++++- static/js/sessions.js | 31 +++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/static/js/search.js b/static/js/search.js index a5e752d..f366fe9 100644 --- a/static/js/search.js +++ b/static/js/search.js @@ -27,6 +27,13 @@ export function showSearchPage() {
`; + document.getElementById('search-results').addEventListener('click', (e) => { + const result = e.target.closest('.search-result[data-project]'); + if (!result) return; + const project = result.getAttribute('data-project'); + const sessionId = result.getAttribute('data-session-id'); + window.location.hash = `#project/${encodeURIComponent(project)}/${encodeURIComponent(sessionId)}`; + }); document.getElementById('search-input').focus(); } @@ -56,7 +63,7 @@ export async function doSearch() { html += '
'; for (const r of results) { - html += `
+ html += `
${esc(r.title)} ${esc(r.project)} • ${esc(r.role)}
...${esc(r.snippet)}...
`; diff --git a/static/js/sessions.js b/static/js/sessions.js index 550debc..d66763d 100644 --- a/static/js/sessions.js +++ b/static/js/sessions.js @@ -4,6 +4,7 @@ import { state } from './shared/state.js'; import { esc, truncate, formatDate, formatTs, smoothSet, loadingBar, showToast, closeSidebar, setHamburgerVisible } from './shared/utils.js'; import { renderMarkdown, cleanContent } from './shared/markdown.js'; import { setWorkspaceMode } from './shared/theme.js'; +import { downloadSession } from './export.js'; // ==================== Workspace (split layout) ==================== @@ -54,7 +55,7 @@ export async function showWorkspace(projectName, selectedSessionId) { const errorClass = s.error ? ' sidebar-item-error' : ''; const errorDetail = s.error_detail ? `
${esc(s.error_detail)}
` : ''; const modelBadge = models ? `${esc(models)}` : ''; - sidebar += `
`; smoothSet(content, html); + bindSidebarSessionClicks(); loadingBar.done(); if (selectedSessionId) { @@ -94,6 +96,30 @@ export async function showWorkspace(projectName, selectedSessionId) { } } +function bindSidebarSessionClicks() { + const sidebar = document.getElementById('sidebar'); + if (!sidebar) return; + sidebar.addEventListener('click', (e) => { + const btn = e.target.closest('button.sidebar-item[data-session-id]'); + if (!btn) return; + const project = btn.getAttribute('data-project'); + const sessionId = btn.getAttribute('data-session-id'); + if (project == null || sessionId == null) return; + selectSession(project, sessionId); + }); +} + +function bindWorkspaceDownloadClick(wsActions) { + const btn = wsActions.querySelector('[data-download-session]'); + if (!btn) return; + btn.addEventListener('click', () => { + const project = btn.getAttribute('data-download-project'); + const sessionId = btn.getAttribute('data-download-session'); + if (project == null || sessionId == null) return; + downloadSession(project, sessionId); + }); +} + export function selectSession(projectName, sessionId) { closeSidebar(); window.location.hash = `#project/${encodeURIComponent(projectName)}/${encodeURIComponent(sessionId)}`; @@ -151,11 +177,12 @@ export async function loadSession(projectName, sessionId) { Copy All - `; + bindWorkspaceDownloadClick(wsActions); } html += `
From 41c1f09f52c336883c435a8e1caa6a5a9e1a974b Mon Sep 17 00:00:00 2001 From: chen Date: Thu, 21 May 2026 05:49:28 +0800 Subject: [PATCH 6/8] fix(ui): early theme apply and safer search/sidebar click handlers --- static/css/style.css | 6 ++++++ static/index.html | 23 +++++++++++++++++++++-- static/js/search.js | 2 ++ static/js/sessions.js | 3 ++- 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/static/css/style.css b/static/css/style.css index 0d74269..36a14d3 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -82,6 +82,12 @@ color-scheme: light; } +/* Theme toggle icons (set before JS module loads; applyTheme keeps them in sync) */ +[data-theme="dark"] #icon-moon { display: block; } +[data-theme="dark"] #icon-sun { display: none; } +[data-theme="light"] #icon-moon { display: none; } +[data-theme="light"] #icon-sun { display: block; } + /* ---------- Reset & Base ---------- */ *, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; } diff --git a/static/index.html b/static/index.html index ca46fc9..9623aaa 100644 --- a/static/index.html +++ b/static/index.html @@ -1,9 +1,18 @@ - + Claude Code Chat Browser +