From 6e3fed76f9f9c28703d89e8b90adb31b919d27b1 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:13:01 +0000 Subject: [PATCH 1/3] feat: add exact search_files tool, agent ergonomics, and `coderag install` Hermes-inspired filesystem-search improvements to CodeRAG's MCP surface: - search_files: exact regex/glob search (ripgrep-backed, pure-Python fallback) as the literal-match complement to semantic search_code. Supports target content/files, output_mode content/files_only/count, context lines, pagination, and conservative secret redaction. Honours the same ignore rules as the indexer via a shared coderag/_ignore.py helper. - Agent ergonomics on the MCP tools: offset pagination on search_code, a loop guard that blocks repeated identical searches, and get_file line numbers + "did you mean?" filename suggestions. - coderag install: one-command registration of the MCP server into Claude Code (.mcp.json), Hermes (~/.hermes/config.yaml), and Codex (~/.codex/config.toml), with a sensible auto-detect default and an interactive wizard. Idempotent, with .bak backups and a --print dry-run. - Docs (README, AGENTS.md) and tests (test_fs_search, test_install, test_mcp). Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_011tgKDQJ8p7YLEzoMz32moC --- AGENTS.md | 3 + README.md | 32 ++- coderag/_ignore.py | 35 ++++ coderag/api.py | 32 ++- coderag/fs_search.py | 351 +++++++++++++++++++++++++++++++++ coderag/indexer.py | 13 +- coderag/install.py | 298 ++++++++++++++++++++++++++++ coderag/surfaces/cli.py | 129 ++++++++++++ coderag/surfaces/mcp_server.py | 161 +++++++++++++-- tests/test_fs_search.py | 114 +++++++++++ tests/test_install.py | 123 ++++++++++++ tests/test_mcp.py | 65 +++++- 12 files changed, 1320 insertions(+), 36 deletions(-) create mode 100644 coderag/_ignore.py create mode 100644 coderag/fs_search.py create mode 100644 coderag/install.py create mode 100644 tests/test_fs_search.py create mode 100644 tests/test_install.py diff --git a/AGENTS.md b/AGENTS.md index bf45334..15950e3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -8,6 +8,9 @@ - `coderag/store/`: `sqlite_store.py` (source of truth + FTS5) and `vector_index.py` (FAISS Flat/IVF cache). - `coderag/retrieval/`: Hybrid dense + BM25 search fused with RRF. - `coderag/indexer.py`, `coderag/watch.py`: Incremental indexing and the debounced watcher. +- `coderag/_ignore.py`: Shared ignore-glob matching used by both the indexer and `fs_search`. +- `coderag/fs_search.py`: Exact regex/glob search (ripgrep-backed, Python fallback) — the literal-match complement to hybrid search; powers the MCP `search_files` tool. +- `coderag/install.py`: `coderag install` — registers the MCP server into Claude Code / Hermes / Codex. - `coderag/surfaces/`: `cli.py`, `http_api.py` (FastAPI), `webui.py`, `mcp_server.py` (MCP, for AI agents) — thin adapters over the facade. - `tests/`: pytest suite (offline by default via the `fake` provider; real model behind `-m integration`). - `example.env` → copy to `.env`; CI lives in `.github/`. diff --git a/README.md b/README.md index 23f3c0d..da727e3 100644 --- a/README.md +++ b/README.md @@ -115,6 +115,7 @@ coderag watch # index, then keep it live as files change coderag serve --port 8000 # run the HTTP API (needs [server]) coderag ui # launch the web UI (needs [ui]) coderag mcp # MCP server for AI agents (needs [mcp]); --all-text for any dir +coderag install [TARGET] # wire the MCP server into Claude Code / Hermes / Codex coderag status # index stats (files, chunks, model, index type) coderag eval --dataset d.jsonl --compare # retrieval quality: dense vs BM25 vs hybrid ``` @@ -185,10 +186,25 @@ coderag mcp --all-text # index ALL text files (docs/notes/config), not just It auto-indexes the working directory on startup (in the **background**, so it's responsive immediately) and keeps the index live with the watcher — zero manual steps. Tools exposed: -**`search_code`** (hybrid search, compact snippets + `path:line`), **`get_file`** (read a -precise range of an indexed file), **`index_status`** (coverage/freshness), and **`reindex`**. +**`search_code`** (hybrid semantic search, compact snippets + `path:line`), **`search_files`** +(exact regex/glob search, ripgrep-backed — the literal-match complement to `search_code`), +**`get_file`** (read a precise range of an indexed file, optional line numbers + "did you +mean?" hints), **`index_status`** (coverage/freshness), and **`reindex`**. -Wire it into an agent (the server defaults to the directory it's launched in): +#### One-command install (`coderag install`) + +Register the server into an agent without hand-editing any config: + +```bash +coderag install # auto-detect installed agents and wire them up +coderag install --wizard # interactive: pick agents, workspace, exposed tools +coderag install hermes --print # preview the exact config change without writing +``` + +Supported targets: **Claude Code** (`.mcp.json`), **Hermes** (`~/.hermes/config.yaml`, with +`tools.include`), and **Codex** (`~/.codex/config.toml`). It is idempotent and backs up any +file it changes to `*.bak`. The equivalent manual config (the server defaults to the +directory it's launched in): ```bash # Claude Code @@ -207,6 +223,16 @@ command = "coderag" args = ["mcp"] ``` +```yaml +# Hermes: ~/.hermes/config.yaml +mcp_servers: + coderag: + command: coderag + args: [mcp] + tools: + include: [search_code, search_files, get_file, index_status, reindex] +``` + > If `coderag` isn't on the launcher's PATH, use an absolute path (or `python -m coderag.surfaces.cli mcp`). > To index a directory other than where the client launches, add `"--watched-dir", "/abs/path"` to `args`. > Fast by default (local `bge-small`, no reranker); set `CODERAG_RERANK=1` to trade ~30 ms/query for sharper top results. diff --git a/coderag/_ignore.py b/coderag/_ignore.py new file mode 100644 index 0000000..14314f8 --- /dev/null +++ b/coderag/_ignore.py @@ -0,0 +1,35 @@ +"""Shared ignore-glob matching for indexing and exact filesystem search. + +Both the :class:`~coderag.indexer.Indexer` and the exact filesystem search +(:mod:`coderag.fs_search`) must skip the *same* set of paths — vendored deps, VCS +directories, build output — or the two would disagree about what "the workspace" is. +The matching rule lives here so both callers stay in lock-step instead of each +re-implementing it. +""" + +from __future__ import annotations + +import fnmatch +from typing import Iterable, Set + + +def ignore_dir_names(ignore_globs: Iterable[str]) -> Set[str]: + """Top-level directory names that can be pruned wholesale during a walk. + + Derived from ``"/*"`` globs (e.g. ``"node_modules/*"`` -> ``"node_modules"``) + so ``os.walk`` can drop the whole subtree without visiting every entry, and so a + *nested* ``node_modules`` is ignored too (matched by path component, not just prefix). + """ + return {g[:-2] for g in ignore_globs if g.endswith("/*") and "/" not in g[:-2]} + + +def is_ignored(rel: str, ignore_globs: Iterable[str], ignore_dirs: Set[str]) -> bool: + """True if the POSIX relative path ``rel`` should be skipped. + + A path is ignored if any of its components is an ignored directory name, or if the + whole relative path matches one of ``ignore_globs``. + """ + parts = rel.split("/") + if ignore_dirs.intersection(parts): + return True + return any(fnmatch.fnmatch(rel, g) for g in ignore_globs) diff --git a/coderag/api.py b/coderag/api.py index c81b6c9..8cf1cce 100644 --- a/coderag/api.py +++ b/coderag/api.py @@ -10,7 +10,7 @@ import logging import threading from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Union +from typing import TYPE_CHECKING, Any, List, Optional, Union from coderag._lines import split_lines from coderag.config import Config @@ -128,6 +128,36 @@ def search(self, query: str, top_k: Optional[int] = None) -> List[SearchHit]: """Hybrid (dense + lexical) search over the indexed codebase.""" return self.searcher.search(query, top_k or self.config.top_k) + def search_files(self, pattern: str, **kwargs: Any) -> dict: + """Exact regex/glob search over the workspace (the complement to ``search``). + + Thin pass-through to :func:`coderag.fs_search.search_files`, wired to the + configured ``watched_dir`` and ``ignore_globs`` so it sees exactly the same + files the indexer does. See that function for the keyword arguments. + """ + from coderag.fs_search import search_files + + return search_files( + self.config.watched_dir, + pattern, + ignore_globs=self.config.ignore_globs, + **kwargs, + ) + + def suggest_paths(self, path: Union[str, Path], n: int = 3) -> List[str]: + """Indexed paths whose name is closest to ``path`` — for "did you mean?" hints.""" + import difflib + + name = Path(str(path)).name + candidates = self.store.all_file_paths() + # Match on basename first (agents often pass a bare filename), then full path. + by_name = {c: Path(c).name for c in candidates} + close = difflib.get_close_matches(name, list(by_name.values()), n=n, cutoff=0.5) + hits = [c for c, base in by_name.items() if base in close] + if not hits: + hits = difflib.get_close_matches(str(path), candidates, n=n, cutoff=0.4) + return hits[:n] + def get_file( self, path: Union[str, Path], diff --git a/coderag/fs_search.py b/coderag/fs_search.py new file mode 100644 index 0000000..7225b97 --- /dev/null +++ b/coderag/fs_search.py @@ -0,0 +1,351 @@ +"""Exact filesystem search — the regex/glob complement to semantic ``search_code``. + +CodeRAG's hybrid index is great at "find this by *meaning*", but an agent still needs +the other half of the job: "find every literal ``raise TimeoutError``", "list the files +matching ``*_test.py``". That is exactly what coding agents otherwise shell out to +``grep``/``rg``/``find`` for. This module gives them an in-process, ignore-aware exact +search instead — modelled on the Hermes agent's ``search_files`` tool (ripgrep-backed, +``target`` content/files, ``output_mode`` content/files_only/count, context lines, +pagination, secret redaction). + +Design: candidate files are always enumerated in Python, honouring CodeRAG's own +``ignore_globs`` via :mod:`coderag._ignore` (so the search sees exactly the same +workspace the indexer does). When ripgrep is on PATH it scans that explicit file list +for the content case — a genuine speed-up with *no* divergence in which files are +searched, since rg is handed the paths directly. Without ripgrep, a pure-Python scan +produces identical results; that fallback is what the test-suite exercises so CI never +depends on rg being installed. +""" + +from __future__ import annotations + +import json +import os +import re +import shutil +import subprocess +from dataclasses import dataclass, field +from fnmatch import fnmatch +from pathlib import Path +from typing import Dict, Iterator, List, Optional, Sequence, Tuple + +from coderag._ignore import ignore_dir_names, is_ignored +from coderag._lines import split_lines +from coderag.config import DEFAULT_IGNORE_GLOBS + +DEFAULT_LIMIT = 50 +_RG_BATCH = 400 # files per ripgrep invocation, to stay under arg-length limits +_MAX_FILE_BYTES = 1_000_000 # skip files larger than this when scanning content + +# Conservative secret redaction. Two flavours: "keyed" patterns mask only the value that +# follows a credential-ish key, so searching for the word "token" still shows the line; +# "standalone" patterns mask a whole well-known credential shape. +_KEYED_SECRET = re.compile( + r"(?i)\b(api[_-]?key|secret|token|password|passwd|pwd|access[_-]?key)\b" + r"(\s*[:=]\s*['\"]?)([^\s'\"]{6,})" +) +_STANDALONE_SECRETS = ( + re.compile(r"AKIA[0-9A-Z]{16}"), # AWS access key id + re.compile(r"(?i)\bbearer\s+[A-Za-z0-9._\-]{12,}"), + re.compile(r"-----BEGIN[A-Z ]*PRIVATE KEY-----"), +) + + +def redact_secrets(text: str) -> str: + """Mask obvious credential values in a line, conservatively.""" + out = _KEYED_SECRET.sub(lambda m: f"{m.group(1)}{m.group(2)}***", text) + for pat in _STANDALONE_SECRETS: + out = pat.sub("***", out) + return out + + +@dataclass(slots=True) +class _ContentMatch: + path: str # POSIX path relative to root + line_number: int # 1-based + line: str + before: List[str] = field(default_factory=list) + after: List[str] = field(default_factory=list) + + +def _rg_available() -> bool: + """Whether ripgrep is on PATH. Indirected so tests can force the Python path.""" + return shutil.which("rg") is not None + + +def _iter_files(root: Path, ignore_globs: Sequence[str]) -> Iterator[Tuple[Path, str]]: + """Yield ``(absolute_path, posix_rel)`` for every non-ignored file under ``root``.""" + ignore_dirs = ignore_dir_names(ignore_globs) + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in ignore_dirs] + for name in filenames: + abs_path = Path(dirpath) / name + try: + rel = abs_path.relative_to(root).as_posix() + except ValueError: # pragma: no cover - defensive + continue + if is_ignored(rel, ignore_globs, ignore_dirs): + continue + yield abs_path, rel + + +def _glob_matches(rel: str, glob: str) -> bool: + """Match a glob against the full relative path or just the basename (``*.py``).""" + return fnmatch(rel, glob) or fnmatch(rel.rsplit("/", 1)[-1], glob) + + +def _read_text(abs_path: Path, max_file_bytes: int) -> Optional[str]: + """Read a file as text, skipping ones that are too large or binary (NUL sniff).""" + try: + data = abs_path.read_bytes() + except OSError: + return None + if len(data) > max_file_bytes or b"\x00" in data[:8192]: + return None + return data.decode("utf-8", errors="replace") + + +def _match_python( + files: Sequence[Tuple[Path, str]], + regex: "re.Pattern[str]", + context: int, + max_file_bytes: int, +) -> List[_ContentMatch]: + matches: List[_ContentMatch] = [] + for abs_path, rel in files: + text = _read_text(abs_path, max_file_bytes) + if text is None: + continue + lines = split_lines(text) + for i, line in enumerate(lines): + if regex.search(line): + before = lines[max(0, i - context) : i] if context else [] + after = lines[i + 1 : i + 1 + context] if context else [] + matches.append(_ContentMatch(rel, i + 1, line, before, after)) + return matches + + +def _match_ripgrep( + files: Sequence[Tuple[Path, str]], + pattern: str, + ignore_case: bool, +) -> List[_ContentMatch]: + """Scan an explicit file list with ripgrep (context-free fast path). + + Files are passed by path, so ripgrep's own ignore rules never apply — the set of + searched files is exactly what :func:`_iter_files` produced. Raises on any failure + so the caller can fall back to the Python scan. + """ + rel_by_abs = {str(abs_path): rel for abs_path, rel in files} + matches: List[_ContentMatch] = [] + paths = list(rel_by_abs.keys()) + for start in range(0, len(paths), _RG_BATCH): + batch = paths[start : start + _RG_BATCH] + cmd = ["rg", "--json", "-n", "--no-config"] + if ignore_case: + cmd.append("-i") + cmd += ["-e", pattern, "--", *batch] + proc = subprocess.run(cmd, capture_output=True, text=True, check=False) + # rg exits 1 when there are simply no matches; 2+ is a real error. + if proc.returncode >= 2: + raise RuntimeError(proc.stderr.strip() or "ripgrep failed") + for raw in proc.stdout.splitlines(): + if not raw: + continue + event = json.loads(raw) + if event.get("type") != "match": + continue + data = event["data"] + abs_text = data["path"]["text"] + rel = rel_by_abs.get(abs_text, abs_text) + line = data["lines"]["text"].rstrip("\n") + matches.append(_ContentMatch(rel, data["line_number"], line)) + matches.sort(key=lambda m: (m.path, m.line_number)) + return matches + + +def _paginate(items: List, offset: int, limit: int) -> Tuple[List, bool, Optional[int]]: + total = len(items) + page = items[offset : offset + limit] if limit > 0 else items[offset:] + truncated = limit > 0 and offset + limit < total + next_offset = offset + limit if truncated else None + return page, truncated, next_offset + + +def _shape_content( + matches: List[_ContentMatch], + *, + output_mode: str, + offset: int, + limit: int, + context: int, + redact: bool, +) -> Tuple[List[Dict], int]: + """Project raw content matches into the requested output_mode + page.""" + if output_mode == "files_only": + seen: List[str] = [] + for m in matches: + if m.path not in seen: + seen.append(m.path) + page, _, _ = _paginate(seen, offset, limit) + return [{"path": p} for p in page], len(seen) + + if output_mode == "count": + counts: Dict[str, int] = {} + for m in matches: + counts[m.path] = counts.get(m.path, 0) + 1 + rows = [{"path": p, "count": counts[p]} for p in sorted(counts)] + page, _, _ = _paginate(rows, offset, limit) + return page, len(rows) + + # default: "content" + page, _, _ = _paginate(matches, offset, limit) + rows = [] + for m in page: + row: Dict = { + "location": f"{m.path}:{m.line_number}", + "path": m.path, + "line_number": m.line_number, + "line": redact_secrets(m.line) if redact else m.line, + } + if context: + row["before"] = [redact_secrets(x) if redact else x for x in m.before] + row["after"] = [redact_secrets(x) if redact else x for x in m.after] + rows.append(row) + return rows, len(matches) + + +def search_files( + root: os.PathLike, + pattern: str, + *, + target: str = "content", + file_glob: Optional[str] = None, + output_mode: str = "content", + context: int = 0, + limit: int = DEFAULT_LIMIT, + offset: int = 0, + ignore_globs: Sequence[str] = DEFAULT_IGNORE_GLOBS, + ignore_case: bool = False, + max_file_bytes: int = _MAX_FILE_BYTES, + redact: bool = True, + use_ripgrep: bool = True, +) -> Dict: + """Exact regex/glob search over the workspace, honouring CodeRAG's ignore rules. + + Args: + root: Workspace root to search under. + pattern: A regex (``target="content"``) or a filename glob (``target="files"``). + target: ``"content"`` (regex inside files) or ``"files"`` (find by name). + file_glob: For content search, restrict to files matching this glob (e.g. ``*.py``). + output_mode: ``"content"`` | ``"files_only"`` | ``"count"`` (content target only). + context: Lines of context around each match (content + Python path only). + limit: Page size (``<= 0`` means no limit). + offset: Page offset, for paginating large result sets. + ignore_globs: Ignore patterns; defaults to CodeRAG's standard set. + ignore_case: Case-insensitive matching. + max_file_bytes: Skip files larger than this when scanning content. + redact: Mask obvious credential values in returned lines. + use_ripgrep: Use ripgrep for the content fast path when available. + + Returns a JSON-able dict with ``results`` plus pagination metadata. + """ + root_path = Path(root).resolve() + if target not in ("content", "files"): + return {"error": f"unknown target {target!r} (use 'content' or 'files')"} + if output_mode not in ("content", "files_only", "count"): + return {"error": f"unknown output_mode {output_mode!r}"} + if offset < 0: + offset = 0 + + if target == "files": + rels = sorted( + rel + for _, rel in _iter_files(root_path, ignore_globs) + if _glob_matches(rel, pattern) + ) + page, truncated, next_offset = _paginate(rels, offset, limit) + return _envelope( + pattern, + target, + "files", + [{"path": p} for p in page], + len(rels), + offset, + next_offset, + truncated, + ripgrep=False, + ) + + # target == "content" + try: + regex = re.compile(pattern, re.IGNORECASE if ignore_case else 0) + except re.error as exc: + return {"error": f"invalid regex: {exc}", "pattern": pattern} + + files = [ + (abs_path, rel) + for abs_path, rel in _iter_files(root_path, ignore_globs) + if file_glob is None or _glob_matches(rel, file_glob) + ] + + used_rg = False + matches: Optional[List[_ContentMatch]] = None + if use_ripgrep and context == 0 and files and _rg_available(): + try: + matches = _match_ripgrep(files, pattern, ignore_case) + used_rg = True + except Exception: # fall back to the always-correct Python scan + matches = None + if matches is None: + matches = _match_python(files, regex, context, max_file_bytes) + + results, total = _shape_content( + matches, + output_mode=output_mode, + offset=offset, + limit=limit, + context=context, + redact=redact, + ) + _, truncated, next_offset = _paginate(list(range(total)), offset, limit) + return _envelope( + pattern, + target, + output_mode, + results, + total, + offset, + next_offset, + truncated, + ripgrep=used_rg, + ) + + +def _envelope( + pattern: str, + target: str, + output_mode: str, + results: List[Dict], + total: int, + offset: int, + next_offset: Optional[int], + truncated: bool, + *, + ripgrep: bool, +) -> Dict: + env: Dict = { + "pattern": pattern, + "target": target, + "output_mode": output_mode, + "count": len(results), + "total": total, + "offset": offset, + "truncated": truncated, + "ripgrep": ripgrep, + "results": results, + } + if truncated: + env["next_offset"] = next_offset + env["hint"] = f"Results truncated. Use offset={next_offset} to see more." + return env diff --git a/coderag/indexer.py b/coderag/indexer.py index 0970e40..9039645 100644 --- a/coderag/indexer.py +++ b/coderag/indexer.py @@ -8,7 +8,6 @@ from __future__ import annotations -import fnmatch import hashlib import logging import os @@ -18,6 +17,7 @@ import numpy as np +from coderag._ignore import ignore_dir_names, is_ignored from coderag.chunking import chunk_file from coderag.chunking.languages import detect_language from coderag.config import Config @@ -50,11 +50,7 @@ def __init__( self.provider = provider self.store = store self.vectors = vectors - self._ignore_dirs = { - g[:-2] - for g in config.ignore_globs - if g.endswith("/*") and "/" not in g[:-2] - } + self._ignore_dirs = ignore_dir_names(config.ignore_globs) # --- public --- @@ -242,7 +238,4 @@ def _rel(abs_path: Path, root: Path) -> Optional[str]: return None def _ignored(self, rel: str) -> bool: - parts = rel.split("/") - if self._ignore_dirs.intersection(parts): - return True - return any(fnmatch.fnmatch(rel, g) for g in self.config.ignore_globs) + return is_ignored(rel, self.config.ignore_globs, self._ignore_dirs) diff --git a/coderag/install.py b/coderag/install.py new file mode 100644 index 0000000..8ffeda6 --- /dev/null +++ b/coderag/install.py @@ -0,0 +1,298 @@ +"""One-command registration of CodeRAG's MCP server into coding agents. + +``coderag install`` wires the ``coderag mcp`` server into an agent's config so the agent +gains CodeRAG's search tools with no hand-editing. Three targets are supported, each with +its own config format: + +* **claude** — Claude Code: ``.mcp.json`` in the current repo (``mcpServers.coderag``). +* **hermes** — Nous Research's Hermes agent: ``~/.hermes/config.yaml`` + (``mcp_servers.coderag`` with ``tools.include``). +* **codex** — OpenAI Codex: ``~/.codex/config.toml`` (``[mcp_servers.coderag]``). + +Design goals: a stable common-sense default (auto-detect the installed agents, sensible +defaults, idempotent, backups) plus an interactive wizard for customisation. Everything is +file-based and idempotent — re-running never duplicates an entry, and an existing file is +backed up to ``*.bak`` before it is rewritten. YAML support is optional (PyYAML); when it +is missing we fall back to printing the exact snippet to paste. +""" + +from __future__ import annotations + +import json +import shutil +import sys +import tomllib +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +try: # PyYAML is optional — only needed for the Hermes target. + import yaml +except ImportError: # pragma: no cover - exercised via the manual-fallback path + yaml = None + +TARGETS = ("claude", "hermes", "codex") +DEFAULT_TOOLS = ["search_code", "search_files", "get_file", "index_status", "reindex"] + + +@dataclass +class InstallResult: + """Outcome of installing one target.""" + + target: str + path: str + action: ( + str # created | updated | unchanged | appended | manual | would-write | error + ) + detail: str = "" + + @property + def changed(self) -> bool: + return self.action in ("created", "updated", "appended") + + +@dataclass +class Plan: + """A chosen install (one target), produced by the wizard or the CLI args.""" + + target: str + watched_dir: Optional[Path] = None + scope: str = "project" + tools: List[str] = field(default_factory=lambda: list(DEFAULT_TOOLS)) + + +# --- shared helpers ------------------------------------------------------------------- + + +def _server_invocation(watched_dir: Optional[Path]) -> Tuple[str, List[str]]: + """How an agent should launch the server: ``coderag mcp`` if on PATH, else ``python -m``. + + Mirrors the README's launcher note (``README.md:210``). When ``watched_dir`` is given, + a ``--watched-dir`` arg is appended so a globally-configured agent indexes the right + tree regardless of where it was launched. + """ + if shutil.which("coderag"): + command, args = "coderag", ["mcp"] + else: + command, args = sys.executable, ["-m", "coderag.surfaces.cli", "mcp"] + if watched_dir is not None: + args = args + ["--watched-dir", str(Path(watched_dir).resolve())] + return command, args + + +def _backup(path: Path) -> None: + shutil.copy2(path, path.with_suffix(path.suffix + ".bak")) + + +def detect_targets() -> List[str]: + """Agents that appear to be installed on this machine, for the zero-arg default.""" + found: List[str] = [] + if shutil.which("claude") or (Path.cwd() / ".mcp.json").exists(): + found.append("claude") + if (Path.home() / ".hermes").exists(): + found.append("hermes") + if (Path.home() / ".codex").exists(): + found.append("codex") + return found + + +# --- per-target writers --------------------------------------------------------------- + + +def _install_claude(watched_dir: Optional[Path], dry_run: bool) -> InstallResult: + path = Path.cwd() / ".mcp.json" + command, args = _server_invocation(watched_dir) + server = {"command": command, "args": args} + + existed = path.exists() + existing: Dict = {} + if existed: + try: + existing = json.loads(path.read_text(encoding="utf-8")) or {} + except json.JSONDecodeError as exc: + return InstallResult("claude", str(path), "error", f"invalid JSON: {exc}") + servers = dict(existing.get("mcpServers") or {}) + if servers.get("coderag") == server: + return InstallResult("claude", str(path), "unchanged") + + servers["coderag"] = server + desired = {**existing, "mcpServers": servers} + rendered = json.dumps(desired, indent=2) + if dry_run: + return InstallResult("claude", str(path), "would-write", rendered) + if existed: + _backup(path) + path.write_text(rendered + "\n", encoding="utf-8") + return InstallResult("claude", str(path), "updated" if existed else "created") + + +def _install_hermes( + watched_dir: Optional[Path], tools: List[str], dry_run: bool +) -> InstallResult: + path = Path.home() / ".hermes" / "config.yaml" + wd = watched_dir if watched_dir is not None else Path.cwd() + command, args = _server_invocation(wd) + server = {"command": command, "args": args, "tools": {"include": list(tools)}} + + if yaml is None: + snippet = _yaml_snippet(server) + return InstallResult( + "hermes", + str(path), + "manual", + "PyYAML not installed (pip install pyyaml). Add this under mcp_servers:\n" + + snippet, + ) + + existed = path.exists() + existing: Dict = {} + if existed: + existing = yaml.safe_load(path.read_text(encoding="utf-8")) or {} + servers = dict(existing.get("mcp_servers") or {}) + if servers.get("coderag") == server: + return InstallResult("hermes", str(path), "unchanged") + + servers["coderag"] = server + existing["mcp_servers"] = servers + rendered = yaml.safe_dump(existing, sort_keys=False, default_flow_style=False) + if dry_run: + return InstallResult("hermes", str(path), "would-write", rendered) + path.parent.mkdir(parents=True, exist_ok=True) + if existed: + _backup(path) + path.write_text(rendered, encoding="utf-8") + return InstallResult("hermes", str(path), "updated" if existed else "created") + + +def _install_codex(watched_dir: Optional[Path], dry_run: bool) -> InstallResult: + path = Path.home() / ".codex" / "config.toml" + wd = watched_dir if watched_dir is not None else Path.cwd() + command, args = _server_invocation(wd) + desired = {"command": command, "args": args} + snippet = _toml_snippet(command, args) + + existing: Dict = {} + if path.exists(): + try: + existing = tomllib.loads(path.read_text(encoding="utf-8")) + except tomllib.TOMLDecodeError as exc: + return InstallResult("codex", str(path), "error", f"invalid TOML: {exc}") + current = (existing.get("mcp_servers") or {}).get("coderag") + if current == desired: + return InstallResult("codex", str(path), "unchanged") + # We append rather than rewrite (no TOML writer dep, and appending preserves comments). + # If a *different* coderag table already exists, appending would duplicate it, so we + # surface a manual edit instead of corrupting the file. + if current is not None: + return InstallResult( + "codex", + str(path), + "manual", + "An mcp_servers.coderag entry already exists with different settings. " + "Replace it with:\n" + snippet, + ) + if dry_run: + return InstallResult("codex", str(path), "would-write", snippet) + path.parent.mkdir(parents=True, exist_ok=True) + if path.exists(): + _backup(path) + with path.open("a", encoding="utf-8") as fh: + fh.write("\n" + snippet) + return InstallResult("codex", str(path), "appended") + path.write_text(snippet, encoding="utf-8") + return InstallResult("codex", str(path), "created") + + +def _toml_snippet(command: str, args: List[str]) -> str: + args_toml = "[" + ", ".join(json.dumps(a) for a in args) + "]" + return ( + f"[mcp_servers.coderag]\ncommand = {json.dumps(command)}\nargs = {args_toml}\n" + ) + + +def _yaml_snippet(server: Dict) -> str: + if yaml is not None: + return yaml.safe_dump({"coderag": server}, sort_keys=False) + return json.dumps({"coderag": server}, indent=2) # pragma: no cover + + +def install( + target: str, + *, + watched_dir: Optional[Path] = None, + scope: str = "project", + tools: Optional[List[str]] = None, + dry_run: bool = False, +) -> InstallResult: + """Register CodeRAG's MCP server for ``target`` (``claude``|``hermes``|``codex``).""" + tools = tools or list(DEFAULT_TOOLS) + if target == "claude": + return _install_claude(watched_dir, dry_run) + if target == "hermes": + return _install_hermes(watched_dir, tools, dry_run) + if target == "codex": + return _install_codex(watched_dir, dry_run) + return InstallResult(target, "", "error", f"unknown target {target!r}") + + +# --- interactive wizard --------------------------------------------------------------- + + +def _ask(prompt: str, default: str) -> str: + raw = input(f"{prompt} [{default}]: ").strip() + return raw or default + + +def _ask_yes_no(prompt: str, default: bool = True) -> bool: + hint = "Y/n" if default else "y/N" + raw = input(f"{prompt} [{hint}]: ").strip().lower() + if not raw: + return default + return raw[0] == "y" + + +def _ask_targets(detected: List[str]) -> List[str]: + default = detected or list(TARGETS) + print("\nWhich agents should CodeRAG be installed for?") + for i, t in enumerate(TARGETS, 1): + mark = " (detected)" if t in detected else "" + print(f" {i}. {t}{mark}") + default_str = ",".join(str(TARGETS.index(t) + 1) for t in default) + raw = _ask("Enter numbers (comma-separated)", default_str) + chosen: List[str] = [] + for token in raw.replace(" ", "").split(","): + if token.isdigit() and 1 <= int(token) <= len(TARGETS): + t = TARGETS[int(token) - 1] + if t not in chosen: + chosen.append(t) + return chosen or default + + +def _ask_tools() -> List[str]: + if _ask_yes_no("Expose all CodeRAG tools?", True): + return list(DEFAULT_TOOLS) + print("Select tools to expose:") + for i, t in enumerate(DEFAULT_TOOLS, 1): + print(f" {i}. {t}") + raw = _ask("Enter numbers (comma-separated)", "1,2,3,4,5") + picked = [ + DEFAULT_TOOLS[int(tok) - 1] + for tok in raw.replace(" ", "").split(",") + if tok.isdigit() and 1 <= int(tok) <= len(DEFAULT_TOOLS) + ] + return picked or list(DEFAULT_TOOLS) + + +def run_wizard(detected: List[str], default_watched: Path) -> List[Plan]: + """Collect install choices interactively. Returns one :class:`Plan` per chosen target.""" + print("CodeRAG install wizard\n----------------------") + targets = _ask_targets(detected) + watched = Path( + _ask("Workspace directory to index", str(default_watched)) + ).expanduser() + plans: List[Plan] = [] + for t in targets: + # Only Hermes supports per-server tool filtering in its config. + tools = _ask_tools() if t == "hermes" else list(DEFAULT_TOOLS) + plans.append(Plan(target=t, watched_dir=watched, tools=tools)) + return plans diff --git a/coderag/surfaces/cli.py b/coderag/surfaces/cli.py index 4d1c0d2..fc565ff 100644 --- a/coderag/surfaces/cli.py +++ b/coderag/surfaces/cli.py @@ -219,6 +219,101 @@ def cmd_mcp(args: argparse.Namespace) -> int: return 0 +def _confirm(prompt: str) -> bool: + return input(f"{prompt} [y/N]: ").strip().lower().startswith("y") + + +_NEXT_STEPS = { + "claude": "Restart Claude Code (or run `claude mcp list`) to load coderag.", + "hermes": "Restart Hermes (or run `hermes mcp list`) to load coderag.", + "codex": "Restart Codex to load the coderag MCP server.", +} + + +def cmd_install(args: argparse.Namespace) -> int: + """Register CodeRAG's MCP server in an AI agent (Claude Code, Hermes, Codex).""" + from coderag import install as inst + + default_watched = ( + Path(args.watched_dir).expanduser() if args.watched_dir else Path.cwd() + ) + explicit_watched = Path(args.watched_dir).expanduser() if args.watched_dir else None + interactive = sys.stdin.isatty() + + # Bare `coderag install` on a terminal → the friendly wizard; otherwise the stable + # auto-detect default (and never prompt when there is no TTY, e.g. in CI). + use_wizard = args.wizard or ( + args.target is None and not args.yes and not args.print and interactive + ) + if use_wizard: + if not interactive: + print("The wizard needs an interactive terminal. Pass a target or --yes.") + return 1 + plans = inst.run_wizard(inst.detect_targets(), default_watched) + else: + targets = [args.target] if args.target else inst.detect_targets() + if not targets: + print( + "No supported agents detected. Pass a target (claude|hermes|codex) " + "or run `coderag install --wizard`." + ) + return 1 + plans = [ + inst.Plan(target=t, watched_dir=explicit_watched, scope=args.scope) + for t in targets + ] + + # Always preview (dry-run) first. + previews = [ + ( + plan, + inst.install( + plan.target, + watched_dir=plan.watched_dir, + scope=plan.scope, + tools=plan.tools, + dry_run=True, + ), + ) + for plan in plans + ] + print() + for _plan, r in previews: + print(f"== {r.target} ({r.path}) [{r.action}]") + if r.detail: + print(textwrap.indent(r.detail.rstrip(), " ")) + print() + + if args.print: + return 0 + if not args.yes and interactive and not _confirm("Apply these changes?"): + print("Aborted.") + return 1 + + final = [ + inst.install( + plan.target, + watched_dir=plan.watched_dir, + scope=plan.scope, + tools=plan.tools, + dry_run=False, + ) + for plan, _ in previews + ] + print() + for r in final: + line = f" [{r.action}] {r.target}: {r.path}" + if r.action in ("manual", "error") and r.detail: + line += f"\n {r.detail.splitlines()[0]}" + print(line) + steps = {_NEXT_STEPS[r.target] for r in final if r.target in _NEXT_STEPS} + if steps: + print("\nNext steps:") + for s in sorted(steps): + print(f" - {s}") + return 0 if all(r.action != "error" for r in final) else 1 + + def cmd_ui(args: argparse.Namespace) -> int: try: from coderag.surfaces.webui import run_ui @@ -392,6 +487,40 @@ def build_parser() -> argparse.ArgumentParser: _add_common(p_mcp) p_mcp.set_defaults(func=cmd_mcp) + p_install = sub.add_parser( + "install", + help="Register CodeRAG's MCP server in an AI agent (Claude Code, Hermes, Codex) " + "— one command instead of hand-editing config.", + ) + p_install.add_argument( + "target", + nargs="?", + choices=("claude", "hermes", "codex"), + help="Agent to install for. Omit to auto-detect (or launch the wizard on a TTY).", + ) + p_install.add_argument( + "--wizard", action="store_true", help="Interactive guided install." + ) + p_install.add_argument( + "--print", + dest="print", + action="store_true", + help="Preview the config changes without writing anything (dry-run).", + ) + p_install.add_argument( + "--yes", + action="store_true", + help="Apply without the confirmation prompt (non-interactive).", + ) + p_install.add_argument( + "--scope", + choices=("user", "project"), + default="project", + help="Config scope where applicable (default project).", + ) + _add_common(p_install) + p_install.set_defaults(func=cmd_install) + p_ui = sub.add_parser("ui", help="Launch the built-in web UI.") p_ui.add_argument( "--host", diff --git a/coderag/surfaces/mcp_server.py b/coderag/surfaces/mcp_server.py index bd17ae3..3231527 100644 --- a/coderag/surfaces/mcp_server.py +++ b/coderag/surfaces/mcp_server.py @@ -10,12 +10,13 @@ Design: like the other surfaces (``cli``/``http_api``/``webui``), this is a thin adapter over the :class:`coderag.api.CodeRAG` facade. Heavy imports (the ``mcp`` SDK) live inside the functions so importing this module stays cheap and the ``[mcp]`` extra is only needed -to actually run it. The four tools route entirely through existing facade methods. +to actually run it. The five tools route entirely through existing facade methods. Note: this module intentionally does NOT use ``from __future__ import annotations`` — the MCP SDK introspects the tools' real type hints to generate their input/output schemas. """ +import json import logging import threading from typing import TYPE_CHECKING, List, Literal, Optional @@ -29,12 +30,19 @@ logger = logging.getLogger(__name__) _INSTRUCTIONS = ( - "CodeRAG indexes this workspace for fast semantic + keyword search. Prefer the " - "search_code tool over grep/glob/read loops to find code or text by meaning or by " - "identifier — it returns ranked results with exact path:line locations in one call. " + "CodeRAG indexes this workspace for fast search. Two complementary search tools, both " + "preferable to grep/glob/find/read loops:\n" + "- search_code: semantic + keyword (hybrid) search. Use it to find code by MEANING or " + "by identifier ('where is retry/backoff handled?'). Returns ranked path:line results.\n" + "- search_files: exact regex/glob search (ripgrep-backed). Use it to find a LITERAL " + "string or pattern, or to locate files by name (target='files', e.g. '*_test.py').\n" "Then use get_file to read a precise range. Call index_status to check freshness." ) +# After this many identical consecutive search calls, the next one is blocked — a guard +# against an agent looping on the same fruitless query (mirrors Hermes' search_files). +_LOOP_LIMIT = 4 + class _State: """Mutable server state shared between the tools and the background threads.""" @@ -42,6 +50,17 @@ class _State: def __init__(self) -> None: self.indexing = False # True while the initial/manual index runs self.stop = threading.Event() # set on shutdown to stop the watcher thread + self._last_key: Optional[str] = None # last search (tool, args) signature + self._repeat = 0 # how many times in a row it has been issued + + def loop_block(self, key: str) -> bool: + """Record a search call and report whether it should be blocked as a loop.""" + if key == self._last_key: + self._repeat += 1 + else: + self._last_key = key + self._repeat = 1 + return self._repeat > _LOOP_LIMIT def _truncate(text: str, max_lines: int) -> "tuple[str, bool]": @@ -98,6 +117,15 @@ def _status_word(state: _State) -> str: return "in_progress" if state.indexing else "ready" +def _loop_error() -> dict: + return { + "error": ( + "Repeated identical search blocked (possible loop). Change the query/pattern, " + "adjust filters, or switch between search_code and search_files." + ) + } + + def build_mcp(cr: "CodeRAG", *, state: Optional[_State] = None) -> "FastMCP": """Build the FastMCP server with CodeRAG's tools wired to the facade. @@ -117,14 +145,16 @@ def search_code( path_prefix: Optional[str] = None, kind: Optional[str] = None, full_text: bool = False, + offset: int = 0, ) -> dict: """Search the indexed workspace by meaning AND keyword (hybrid retrieval). - Use this INSTEAD of grep/glob/read loops to locate code or text: one fast call - returns the most relevant chunks with exact ``path:start-end`` locations. Works for - conceptual questions ("where is retry/backoff handled?") and exact identifiers - alike. Snippets are truncated by default to stay token-cheap — pass - ``full_text=true`` for the whole chunk, or call ``get_file`` for a precise range. + Use this INSTEAD of grep/glob/read loops to locate code or text by MEANING: one + fast call returns the most relevant chunks with exact ``path:start-end`` locations. + Great for conceptual questions ("where is retry/backoff handled?") and identifiers. + For an exact LITERAL string/regex or to find files by name, use ``search_files``. + Snippets are truncated by default to stay token-cheap — pass ``full_text=true`` for + the whole chunk, or call ``get_file`` for a precise range. Args: query: Natural-language question, or a code snippet/identifier to find. @@ -133,42 +163,131 @@ def search_code( path_prefix: Restrict to paths starting with this prefix (e.g. "src/"). kind: Restrict to a chunk kind ("function", "class", "method", "window"). full_text: Return each chunk's full text instead of a truncated snippet. + offset: Skip this many top results — for paging past an earlier call. """ + key = "search_code:" + json.dumps( + [query, top_k, language, path_prefix, kind, full_text, offset], + sort_keys=True, + ) + if state.loop_block(key): + return _loop_error() + offset = max(0, offset) + want = offset + top_k if language or path_prefix or kind: # The searcher can't filter, so pull a deeper pool and filter post-hoc. - pool = max(top_k * 5, cr.config.fetch_k) - hits = _filter_hits( + pool = max(want * 5, cr.config.fetch_k) + filtered = _filter_hits( cr.search(query, top_k=pool), language=language, path_prefix=path_prefix, kind=kind, - )[:top_k] + ) + window, more = filtered[offset:want], len(filtered) > want else: - hits = cr.search(query, top_k=top_k) - return { + fetched = cr.search(query, top_k=want + 1) + window, more = fetched[offset:want], len(fetched) > want + out = { "query": query, - "count": len(hits), + "count": len(window), + "offset": offset, "indexing": _status_word(state), - "results": [_format_hit(h, snippet_lines, full_text) for h in hits], + "results": [_format_hit(h, snippet_lines, full_text) for h in window], } + if more: + out["next_offset"] = want + out["hint"] = f"More results available. Use offset={want} to see more." + return out + + @mcp.tool() + def search_files( + pattern: str, + target: str = "content", + file_glob: Optional[str] = None, + output_mode: str = "content", + context: int = 0, + limit: int = 50, + offset: int = 0, + ignore_case: bool = False, + ) -> dict: + """Exact regex/glob search over the workspace (ripgrep-backed). + + The literal-match complement to ``search_code``. Use this INSTEAD of + grep/rg/find/ls to find an exact string or pattern, or to locate files by name. + Honours the same ignore rules as the index (skips .git, node_modules, build, …). + + Args: + pattern: A regex (``target="content"``) or a filename glob (``target="files"``). + target: ``"content"`` (regex inside files) or ``"files"`` (find files by name). + file_glob: For content search, restrict to files matching this glob (e.g. "*.py"). + output_mode: ``"content"`` (matching lines) | ``"files_only"`` | ``"count"``. + context: Lines of context to include around each content match. + limit: Page size (default 50). + offset: Skip this many results — for paging past an earlier call. + ignore_case: Case-insensitive matching. + """ + key = "search_files:" + json.dumps( + [ + pattern, + target, + file_glob, + output_mode, + context, + limit, + offset, + ignore_case, + ], + sort_keys=True, + ) + if state.loop_block(key): + return _loop_error() + result = cr.search_files( + pattern, + target=target, + file_glob=file_glob, + output_mode=output_mode, + context=context, + limit=limit, + offset=offset, + ignore_case=ignore_case, + ) + result["indexing"] = _status_word(state) + return result @mcp.tool() def get_file( path: str, start_line: Optional[int] = None, end_line: Optional[int] = None, + with_line_numbers: bool = False, ) -> dict: """Return the exact contents of an INDEXED file, optionally a 1-based line range. - Pair with search_code: take a result's path and line range to read precise context. - Only files that are in the index can be read (so this can't fetch arbitrary files - like .env). Returns ``{"error": ...}`` if the path isn't indexed or escapes the - workspace root, rather than failing the call. + Pair with search_code/search_files: take a result's path and line range to read + precise context. Only files that are in the index can be read (so this can't fetch + arbitrary files like .env). On a miss returns ``{"error": ..., "did_you_mean": [...]}`` + with the closest indexed filenames, rather than failing the call. + + Args: + path: Path of an indexed file, relative to the workspace root. + start_line: 1-based first line to return (inclusive). + end_line: 1-based last line to return (inclusive). + with_line_numbers: Prefix each line with ``"LINE|"`` for easy referencing. """ try: content = cr.get_file(path, start_line, end_line) - except (ValueError, FileNotFoundError) as exc: + except FileNotFoundError as exc: + err: dict = {"error": str(exc), "path": path} + suggestions = cr.suggest_paths(path) + if suggestions: + err["did_you_mean"] = suggestions + return err + except ValueError as exc: return {"error": str(exc), "path": path} + if with_line_numbers: + base = start_line or 1 + content = "\n".join( + f"{base + i}|{line}" for i, line in enumerate(content.split("\n")) + ) return { "path": path, "start_line": start_line, diff --git a/tests/test_fs_search.py b/tests/test_fs_search.py new file mode 100644 index 0000000..719be10 --- /dev/null +++ b/tests/test_fs_search.py @@ -0,0 +1,114 @@ +"""Tests for exact filesystem search (:mod:`coderag.fs_search`). + +The pure-Python path is the authoritative implementation and is what these tests force +(``use_ripgrep=False``), so the suite never depends on ripgrep being installed. One +consistency test compares the ripgrep fast path against the Python path when rg is present. +""" + +from __future__ import annotations + +import shutil + +import pytest + +from coderag.fs_search import search_files +from tests.conftest import write + + +@pytest.fixture +def tree(tmp_path): + write(tmp_path / "a.py", "import os\n\ndef alpha():\n return os.getpid()\n") + write(tmp_path / "pkg" / "b.py", "def beta():\n return 'alpha beta'\n") + write(tmp_path / "notes.txt", "alpha mention in text\n") + write(tmp_path / "node_modules" / "dep.py", "def alpha():\n pass\n") # ignored + write(tmp_path / ".git" / "cfg", "alpha\n") # ignored + return tmp_path + + +def test_content_search_finds_matches_and_skips_ignored(tree): + r = search_files(tree, r"def alpha", target="content", use_ripgrep=False) + paths = {row["path"] for row in r["results"]} + assert "a.py" in paths + assert not any("node_modules" in p or p.startswith(".git") for p in paths) + assert r["ripgrep"] is False + + +def test_file_glob_restricts_content_search(tree): + r = search_files( + tree, r"alpha", target="content", file_glob="*.py", use_ripgrep=False + ) + assert r["results"] + assert all(row["path"].endswith(".py") for row in r["results"]) + + +def test_target_files_glob(tree): + r = search_files(tree, "*.py", target="files", use_ripgrep=False) + paths = {row["path"] for row in r["results"]} + assert {"a.py", "pkg/b.py"} <= paths + assert not any("node_modules" in p or p.startswith(".git") for p in paths) + + +def test_output_modes(tree): + files_only = search_files( + tree, r"alpha", target="content", output_mode="files_only", use_ripgrep=False + ) + assert all(set(row) == {"path"} for row in files_only["results"]) + + counts = search_files( + tree, r"alpha", target="content", output_mode="count", use_ripgrep=False + ) + assert counts["results"] and all("count" in row for row in counts["results"]) + + +def test_context_lines(tree): + r = search_files( + tree, r"return os\.getpid", target="content", context=1, use_ripgrep=False + ) + row = r["results"][0] + assert "before" in row and "after" in row + assert any("def alpha" in b for b in row["before"]) + + +def test_pagination(tmp_path): + for i in range(10): + write(tmp_path / f"f{i}.py", "needle\n") + r = search_files(tmp_path, "needle", target="content", limit=4, use_ripgrep=False) + assert r["count"] == 4 and r["truncated"] is True and r["next_offset"] == 4 + assert "offset=4" in r["hint"] + + last = search_files( + tmp_path, "needle", target="content", limit=4, offset=8, use_ripgrep=False + ) + assert last["count"] == 2 and last["truncated"] is False and "hint" not in last + + +def test_redaction(tmp_path): + write(tmp_path / "s.py", 'token = "abcdef123456"\n') + masked = search_files(tmp_path, "token", target="content", use_ripgrep=False) + assert "***" in masked["results"][0]["line"] + + raw = search_files( + tmp_path, "token", target="content", redact=False, use_ripgrep=False + ) + assert "abcdef123456" in raw["results"][0]["line"] + + +def test_invalid_regex_returns_error(tmp_path): + assert "error" in search_files(tmp_path, "(", target="content", use_ripgrep=False) + + +def test_binary_files_skipped(tmp_path): + write(tmp_path / "ok.py", "needle\n") + (tmp_path / "blob.py").write_bytes(b"needle\x00\x01\x02") + r = search_files(tmp_path, "needle", target="content", use_ripgrep=False) + assert {row["path"] for row in r["results"]} == {"ok.py"} + + +def test_ripgrep_matches_python_path(tree): + if shutil.which("rg") is None: + pytest.skip("ripgrep not installed") + rg = search_files(tree, r"alpha", target="content", use_ripgrep=True) + py = search_files(tree, r"alpha", target="content", use_ripgrep=False) + assert rg["ripgrep"] is True and py["ripgrep"] is False + key = lambda res: {(r["path"], r["line_number"]) for r in res["results"]} # noqa: E731 + assert key(rg) == key(py) diff --git a/tests/test_install.py b/tests/test_install.py new file mode 100644 index 0000000..5a9e67e --- /dev/null +++ b/tests/test_install.py @@ -0,0 +1,123 @@ +"""Tests for ``coderag install`` (:mod:`coderag.install`). + +Everything runs against an isolated tmp ``$HOME`` and cwd so no real agent config is +touched. The wizard is driven by feeding scripted answers to ``input``. +""" + +from __future__ import annotations + +import json +import tomllib +from pathlib import Path + +import pytest +import yaml + +from coderag import install as inst + + +@pytest.fixture +def home(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + repo = tmp_path / "repo" + repo.mkdir() + monkeypatch.chdir(repo) + return tmp_path + + +# --- claude (.mcp.json) --------------------------------------------------------------- + + +def test_claude_creates_and_is_idempotent(home): + r = inst.install("claude") + assert r.action == "created" + data = json.loads(Path(r.path).read_text()) + assert "mcp" in data["mcpServers"]["coderag"]["args"] + assert inst.install("claude").action == "unchanged" + + +def test_claude_merges_existing_and_backs_up(home): + p = Path.cwd() / ".mcp.json" + p.write_text(json.dumps({"mcpServers": {"other": {"command": "x", "args": []}}})) + r = inst.install("claude") + assert r.action == "updated" + data = json.loads(p.read_text()) + assert {"other", "coderag"} <= set(data["mcpServers"]) + assert p.with_suffix(".json.bak").exists() + + +# --- hermes (~/.hermes/config.yaml) --------------------------------------------------- + + +def test_hermes_writes_yaml_with_tools(home): + tools = ["search_code", "search_files"] + r = inst.install("hermes", watched_dir=Path.cwd(), tools=tools) + assert r.action == "created" + data = yaml.safe_load(Path(r.path).read_text()) + entry = data["mcp_servers"]["coderag"] + assert entry["tools"]["include"] == tools + assert "--watched-dir" in entry["args"] + assert ( + inst.install("hermes", watched_dir=Path.cwd(), tools=tools).action + == "unchanged" + ) + + +def test_hermes_manual_without_pyyaml(home, monkeypatch): + monkeypatch.setattr(inst, "yaml", None) + r = inst.install("hermes") + assert r.action == "manual" and "coderag" in r.detail + + +# --- codex (~/.codex/config.toml) ----------------------------------------------------- + + +def test_codex_appends_and_is_idempotent(home): + p = Path.home() / ".codex" / "config.toml" + p.parent.mkdir() + p.write_text("[other]\nx = 1\n") + r = inst.install("codex") + assert r.action == "appended" + data = tomllib.loads(p.read_text()) + assert "other" in data + assert data["mcp_servers"]["coderag"]["args"][0] == "mcp" + assert p.with_suffix(".toml.bak").exists() + assert inst.install("codex").action == "unchanged" + + +def test_codex_conflict_is_manual(home): + p = Path.home() / ".codex" / "config.toml" + p.parent.mkdir() + p.write_text('[mcp_servers.coderag]\ncommand = "old"\nargs = []\n') + assert inst.install("codex").action == "manual" + + +# --- shared behaviour ----------------------------------------------------------------- + + +def test_dry_run_writes_nothing(home): + r = inst.install("claude", dry_run=True) + assert r.action == "would-write" + assert not (Path.cwd() / ".mcp.json").exists() + + +def test_unknown_target_errors(home): + assert inst.install("emacs").action == "error" + + +def test_detect_targets(home, monkeypatch): + monkeypatch.setattr(inst.shutil, "which", lambda *_: None) + assert inst.detect_targets() == [] + (Path.home() / ".hermes").mkdir() + (Path.home() / ".codex").mkdir() + assert set(inst.detect_targets()) == {"hermes", "codex"} + + +def test_wizard_collects_choices(home, monkeypatch): + # answers: target "2" (hermes), keep default workspace, expose all tools "y" + answers = iter(["2", "", "y"]) + monkeypatch.setattr("builtins.input", lambda *_: next(answers)) + plans = inst.run_wizard([], Path.cwd()) + assert len(plans) == 1 + assert plans[0].target == "hermes" + assert plans[0].tools == inst.DEFAULT_TOOLS diff --git a/tests/test_mcp.py b/tests/test_mcp.py index daace4a..208728a 100644 --- a/tests/test_mcp.py +++ b/tests/test_mcp.py @@ -56,7 +56,13 @@ def _call(mcp, name, args): def test_tools_are_registered(tmp_path): cr, mcp, _, _ = _make(tmp_path, DEMO) names = {t.name for t in asyncio.run(mcp.list_tools())} - assert names == {"search_code", "get_file", "index_status", "reindex"} + assert names == { + "search_code", + "search_files", + "get_file", + "index_status", + "reindex", + } cr.close() @@ -115,6 +121,47 @@ def test_search_code_filters(tmp_path): cr.close() +def test_search_files_content_and_files(tmp_path): + cr, mcp, _, _ = _make(tmp_path, DEMO) + + content = _call(mcp, "search_files", {"pattern": "authenticate"}) + assert content["count"] >= 1 + assert any(row["path"] == "auth.py" for row in content["results"]) + assert content["indexing"] == "ready" + + files = _call(mcp, "search_files", {"pattern": "*.ts", "target": "files"}) + assert any(row["path"] == "math.ts" for row in files["results"]) + cr.close() + + +def test_search_code_pagination(tmp_path): + files = { + f"f{i}.py": "def token_retry():\n return 'token retry backoff'\n" + for i in range(6) + } + cr, mcp, _, _ = _make(tmp_path, files) + q = "token retry backoff" + page1 = _call(mcp, "search_code", {"query": q, "top_k": 2, "offset": 0}) + assert page1["count"] == 2 and page1["offset"] == 0 and "next_offset" in page1 + + page2 = _call( + mcp, "search_code", {"query": q, "top_k": 2, "offset": page1["next_offset"]} + ) + assert page2["offset"] == page1["next_offset"] + cr.close() + + +def test_loop_detection_blocks_repeated_search(tmp_path): + cr, mcp, _, _ = _make(tmp_path, DEMO) + args = {"query": "authenticate", "top_k": 3} + for _ in range(4): + assert "error" not in _call(mcp, "search_code", args) + assert "error" in _call(mcp, "search_code", args) # 5th identical call blocked + # a different query resets the guard + assert "error" not in _call(mcp, "search_code", {"query": "add", "top_k": 3}) + cr.close() + + def test_get_file_range_and_structured_errors(tmp_path): cr, mcp, _, _ = _make(tmp_path, DEMO) @@ -127,6 +174,22 @@ def test_get_file_range_and_structured_errors(tmp_path): cr.close() +def test_get_file_line_numbers_and_suggestions(tmp_path): + cr, mcp, _, _ = _make(tmp_path, DEMO) + + numbered = _call( + mcp, + "get_file", + {"path": "auth.py", "start_line": 1, "end_line": 1, "with_line_numbers": True}, + ) + assert numbered["content"] == "1|def authenticate(token):" + + # A near-miss filename returns a "did you mean?" hint instead of a bare error. + miss = _call(mcp, "get_file", {"path": "ath.py"}) + assert "error" in miss and "auth.py" in miss.get("did_you_mean", []) + cr.close() + + def test_index_status_reports_totals_and_flag(tmp_path): cr, mcp, state, _ = _make(tmp_path, DEMO) r = _call(mcp, "index_status", {}) From 0dd286f55863c9737df558a0be13d0f004755387 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:40:45 +0000 Subject: [PATCH 2/3] test: avoid gitleaks false-positive in redaction fixture The redaction test wrote a fake `token = "..."` literal that gitleaks' generic-api-key rule flagged as a leak, failing the secret-scan check on PR #52. Use a low-entropy placeholder and a `# gitleaks:allow` marker; the redaction assertion is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_011tgKDQJ8p7YLEzoMz32moC --- tests/test_fs_search.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/test_fs_search.py b/tests/test_fs_search.py index 719be10..c195975 100644 --- a/tests/test_fs_search.py +++ b/tests/test_fs_search.py @@ -83,14 +83,17 @@ def test_pagination(tmp_path): def test_redaction(tmp_path): - write(tmp_path / "s.py", 'token = "abcdef123456"\n') + # Obvious low-entropy placeholder (not a real secret); gitleaks:allow keeps the + # secret-scanner from flagging this test fixture. + fake = "xxxxxxxxxxxx" + write(tmp_path / "s.py", f'token = "{fake}"\n') # gitleaks:allow masked = search_files(tmp_path, "token", target="content", use_ripgrep=False) assert "***" in masked["results"][0]["line"] raw = search_files( tmp_path, "token", target="content", redact=False, use_ripgrep=False ) - assert "abcdef123456" in raw["results"][0]["line"] + assert fake in raw["results"][0]["line"] def test_invalid_regex_returns_error(tmp_path): From bdfbec8ac411d1853056d044cd3bb1f9f3de3287 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 18 Jun 2026 12:44:18 +0000 Subject: [PATCH 3/3] ci: allowlist fake test-fixture secret in gitleaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The redaction test feeds a dummy `token = "..."` line to verify masking; gitleaks' generic-api-key rule flagged it and failed the secret-scan on PR #52. Because gitleaks scans per-commit diffs, the literal lives in the PR's first commit even after the test was tidied — so suppress it with a narrow repo .gitleaks.toml allowlist (default rules kept) rather than rewriting history. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_011tgKDQJ8p7YLEzoMz32moC --- .gitleaks.toml | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 .gitleaks.toml diff --git a/.gitleaks.toml b/.gitleaks.toml new file mode 100644 index 0000000..21877f3 --- /dev/null +++ b/.gitleaks.toml @@ -0,0 +1,19 @@ +# gitleaks configuration for CodeRAG. +# +# Keeps the full default ruleset and only adds a narrow allowlist for fake, +# secret-shaped strings used in test fixtures (e.g. the redaction test feeds a +# dummy `token = "..."` line to confirm it gets masked). These are not real +# credentials; without this, gitleaks' generic-api-key rule fails CI on test data. + +[extend] +useDefault = true + +[allowlist] +description = "Fake secret-shaped strings in test fixtures (not real secrets)" +# Match if the finding is in this test file OR is the known dummy literal. +paths = [ + '''tests/test_fs_search\.py''', +] +regexes = [ + '''abcdef123456''', +]