Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 37 additions & 17 deletions coderag/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from coderag._lines import split_lines
from coderag.config import Config
from coderag.types import IndexStats, SearchHit
from coderag.types import IndexProgress, IndexStats, SearchHit

if TYPE_CHECKING: # avoid import-time cost / cycles
from coderag.embeddings import EmbeddingProvider
Expand All @@ -39,6 +39,12 @@ def __init__(self, config: Optional[Config] = None) -> None:
# surface, the MCP server's background index, and the live watcher) can't
# interleave a file's delete-before-add sequence. Reads (search) are unaffected.
self._index_lock = threading.Lock()
# Guards the lazy construction of the collaborators below. The MCP server now serves
# the protocol before warm-up finishes, so a query can land while the background
# bootstrap is still building the store/provider — without this lock two threads could
# each construct a second (conflicting) LanceStore. Reentrant because the properties
# depend on each other (e.g. ``store`` reads ``provider`` while holding the lock).
self._build_lock = threading.RLock()

# --- lazily constructed collaborators ---

Expand All @@ -47,27 +53,34 @@ def provider(self) -> "EmbeddingProvider":
if self._provider is None:
from coderag.embeddings import get_provider

self._provider = get_provider(self.config)
with self._build_lock:
if self._provider is None:
self._provider = get_provider(self.config)
return self._provider

@property
def store(self) -> "LanceStore":
if self._store is None:
from coderag.store.lance_store import LanceStore

self.config.store_dir.mkdir(parents=True, exist_ok=True)
self._store = LanceStore(self.config.store_dir, self.provider.dim)
# Clears the store when the embedding model/dim changed; a re-index then
# repopulates the now-empty tables (there is no separate cache to rebuild).
self._store.bootstrap(self.provider.dim, self.provider.model_id)
with self._build_lock:
if self._store is None:
self.config.store_dir.mkdir(parents=True, exist_ok=True)
store = LanceStore(self.config.store_dir, self.provider.dim)
# Clears the store when the embedding model/dim changed; a re-index then
# repopulates the now-empty tables (no separate cache to rebuild).
store.bootstrap(self.provider.dim, self.provider.model_id)
self._store = store
return self._store

@property
def indexer(self) -> "Indexer":
if self._indexer is None:
from coderag.indexer import Indexer

self._indexer = Indexer(self.config, self.provider, self.store)
with self._build_lock:
if self._indexer is None:
self._indexer = Indexer(self.config, self.provider, self.store)
return self._indexer

@property
Expand All @@ -76,27 +89,34 @@ def searcher(self) -> "HybridSearcher":
from coderag.retrieval.rerank import get_reranker
from coderag.retrieval.search import HybridSearcher

self._searcher = HybridSearcher(
self.config,
self.provider,
self.store,
reranker=get_reranker(self.config),
)
with self._build_lock:
if self._searcher is None:
self._searcher = HybridSearcher(
self.config,
self.provider,
self.store,
reranker=get_reranker(self.config),
)
return self._searcher

# --- public operations ---

def index(
self, path: Optional[Union[str, Path]] = None, *, full: bool = False
self,
path: Optional[Union[str, Path]] = None,
*,
full: bool = False,
live: Optional[IndexProgress] = None,
) -> IndexStats:
"""Incrementally index ``path`` (defaults to the configured watched dir).

Only files whose content hash changed are re-embedded. Pass ``full=True`` to
force a clean rebuild.
force a clean rebuild. Pass ``live`` (an :class:`IndexProgress`) to receive live,
pollable progress — the MCP server uses this so ``index_status`` reflects the run.
"""
target = Path(path).expanduser() if path else self.config.watched_dir
with self._index_lock:
return self.indexer.index(target, full=full)
return self.indexer.index(target, full=full, live=live)

def search(self, query: str, top_k: Optional[int] = None) -> List[SearchHit]:
"""Hybrid (dense + lexical) search over the indexed codebase."""
Expand Down
49 changes: 44 additions & 5 deletions coderag/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
from coderag.chunking.languages import detect_language
from coderag.config import Config
from coderag.embeddings import EmbeddingProvider
from coderag.types import Chunk, IndexStats
from coderag.types import Chunk, IndexProgress, IndexStats

if TYPE_CHECKING:
from coderag.store.lance_store import LanceStore

logger = logging.getLogger(__name__)

# During a long initial index, commit buffered rows at least this often so dense search can
# return partial results before the steady-state 8192-chunk flush boundary. Only applied when
# a live progress object is supplied (the MCP background index), so the CLI/watcher keep their
# single-flush-at-end batching.
_PARTIAL_FLUSH_SECS = 5.0


class _ProgressReporter:
"""Live, human-facing indexing progress, written to stderr (stdout stays clean).
Expand Down Expand Up @@ -98,11 +104,21 @@ def index(
*,
full: bool = False,
progress: bool = False,
live: Optional[IndexProgress] = None,
) -> IndexStats:
"""Index ``target`` incrementally.

``progress`` enables the human-facing stderr narration; ``live`` is an optional
machine-readable :class:`IndexProgress` the caller can poll concurrently (used by the
MCP server's background index so ``index_status`` reflects live state). Both default
off, so existing callers (CLI/watcher/tests) are unaffected.
"""
root = self.config.watched_dir.resolve()
target = (target or self.config.watched_dir).resolve()
prune = target == root # only a full-root pass removes vanished files
rep = _ProgressReporter(progress)
if live is not None:
live.begin("scanning")

stats = IndexStats()
if full:
Expand All @@ -121,6 +137,8 @@ def index(
stats.files_skipped += 1
else:
work.append(item)
if live is not None:
live.saw_file(len(walked), len(work))
rep.update(
f"Scanning {target} — {len(walked)} file(s) seen, "
f"{len(work)} to index, {stats.files_skipped} unchanged/skipped…"
Expand All @@ -139,10 +157,20 @@ def index(
# 2. (Re)index changed files. Chunking + embedding (the CPU/network cost) may run
# in parallel across files (config.index_workers); the store writes stay on this
# single thread to preserve the delete-before-add invariant and single writer.
for added, removed in self._embed_and_write(work, reporter=rep):
if live is not None and work:
live.set_state("indexing")
last_flush = time.monotonic()
for item, added, removed in self._embed_and_write(work, reporter=rep):
stats.chunks_added += added
stats.chunks_removed += removed
stats.files_indexed += 1
if live is not None:
live.wrote_file(item.rel, added)
# Commit periodically so dense search picks up partials during a long initial
# index, instead of waiting for the 8192-chunk boundary or the final persist.
if time.monotonic() - last_flush > _PARTIAL_FLUSH_SECS:
self.store.flush()
last_flush = time.monotonic()

# 3. Prune files that disappeared from disk (full-root passes only).
if prune:
Expand All @@ -157,6 +185,8 @@ def index(
# never triggers a whole-index rebuild.
changed = stats.files_indexed > 0 or stats.files_removed > 0
if prune and changed:
if live is not None:
live.set_state("optimizing")
self.store.optimize()
else:
self.store.flush()
Expand All @@ -168,6 +198,8 @@ def index(
f"✓ Indexed {stats.files_indexed} file(s) — "
f"{stats.total_files} total / {stats.total_chunks} chunks."
)
if live is not None:
live.finish("ready")
return stats

# --- internals ---
Expand Down Expand Up @@ -220,13 +252,17 @@ def _maybe_work(

def _embed_and_write(
self, work: List[_Work], *, reporter: _ProgressReporter
) -> Iterator[Tuple[int, int]]:
) -> Iterator[Tuple[_Work, int, int]]:
"""Chunk+embed each file (optionally across worker threads) and apply the writes.

Embedding is the expensive, parallelizable step and touches no shared mutable
state, so it runs in a thread pool when ``index_workers > 1``. The store writes are
drained here on the single calling thread, so the no-duplicate (delete-before-add)
invariant and the single-writer store are preserved.

Yields ``(item, chunks_added, chunks_removed)`` per file — the ``_Work`` item is
surfaced so the caller can report the current path (the worker pool completes out of
order, so positional zipping back to ``work`` is not possible).
"""
if not work:
return
Expand All @@ -239,14 +275,17 @@ def _embed_and_write(
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(self._prepare, item): item for item in work}
for fut in as_completed(futures):
item = futures[fut]
chunks, vectors = fut.result()
yield self._write(futures[fut], chunks, vectors)
added, removed = self._write(item, chunks, vectors)
yield item, added, removed
done += 1
reporter.update(f"Embedding {done}/{total} file(s)…")
else:
for item in work:
chunks, vectors = self._prepare(item)
yield self._write(item, chunks, vectors)
added, removed = self._write(item, chunks, vectors)
yield item, added, removed
done += 1
reporter.update(f"Embedding {done}/{total} file(s)…")

Expand Down
Loading
Loading