Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions coderag/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,18 @@ def index(
stats.chunks_removed += removed

# 4. Persist. A full pass that changed something rebuilds the FTS/vector indexes
# and compacts; an incremental/single-file pass just flushes (new rows are
# searchable via LanceDB's flat scan of the unindexed tail) so a watcher edit
# never triggers a whole-index rebuild.
# and compacts. An incremental/single-file pass skips the compaction but still
# asks the store to refresh the ANN/FTS indexes when the unindexed tail has
# grown enough to drag query latency down (``maybe_reindex`` is a cheap no-op
# otherwise) — so a watcher edit never triggers a full rebuild, but the tail of
# brute-forced rows also can't grow unbounded and silently degrade retrieval.
changed = stats.files_indexed > 0 or stats.files_removed > 0
if prune and changed:
if live is not None:
live.set_state("optimizing")
self.store.optimize()
elif changed:
self.store.maybe_reindex()
else:
self.store.flush()

Expand Down
128 changes: 113 additions & 15 deletions coderag/store/lance_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
# LanceDB needs enough rows to train a vector ANN index; below this, brute-force is exact
# and fast, so we skip indexing (also keeps tiny test corpora on the exact path).
_ANN_MIN_ROWS = 256
# Incremental writes append to an *unindexed tail* that every vector query brute-forces.
# Once that tail grows past this many rows, query latency starts to degrade noticeably
# (a few thousand rows already costs tens of ms), so an incremental pass rebuilds the ANN
# index instead of letting the tail grow unbounded. See ``maybe_reindex``.
_ANN_REINDEX_TAIL = 4096
_HYDRATE_COLS = [
"id",
"path",
Expand Down Expand Up @@ -81,6 +86,7 @@ def __init__(self, store_dir: Path, dim: int) -> None:
self._symbol_index_gen = -1
if _CHUNKS in self._db.table_names():
self._next_id = self._max_id() + 1
self._refresh_ann_state()

# --- schema ---

Expand Down Expand Up @@ -309,23 +315,115 @@ def optimize(self) -> None:
tbl.cleanup_old_versions()
except Exception: # pragma: no cover - compaction is best-effort
logger.exception("LanceDB optimize failed (continuing).")
self._build_search_indexes(tbl)

def maybe_reindex(self) -> bool:
"""Keep the ANN/FTS indexes fresh on an incremental pass, without a full optimize.

Incremental writes append rows to an *unindexed tail* that every vector query
brute-forces; left unbounded that tail is what turns sub-50ms retrieval into
hundreds of ms. This rebuilds the search indexes only when the tail has grown past
``_ANN_REINDEX_TAIL`` (or no ANN index exists yet at scale) — otherwise it is a
cheap no-op (just a flush). Returns True iff it rebuilt. Skips the compaction that
``optimize`` does, so it stays light enough for a watcher to call after each edit.
"""
with self._lock:
self._flush()
if _CHUNKS not in self._db.table_names():
return False
tbl = self._db.open_table(_CHUNKS)
n = tbl.count_rows()
if n < _ANN_MIN_ROWS:
self._ann_built = False
return False
state = self._vector_index_stats(tbl)
# No index yet => the whole table is the (brute-forced) tail.
unindexed = state[1] if state is not None else n
if state is not None and unindexed <= _ANN_REINDEX_TAIL:
self._ann_built = True
return False
self._build_search_indexes(tbl)
return True

def _build_search_indexes(self, tbl: Any) -> None:
"""(Re)build the FTS, scalar (id/path), and vector ANN indexes for ``tbl``.

Each build is independent and best-effort: a failure is logged loudly and leaves
that lookup on LanceDB's brute-force/scan fallback rather than aborting the others.
``_ann_built`` tracks the *real* state so a silent fallback is observable via
``index_kind`` instead of masquerading as an ANN index.
"""
try:
tbl.create_fts_index("text", replace=True)
except Exception: # pragma: no cover
logger.exception("LanceDB FTS index build failed (lexical scan fallback).")
# Scalar indexes make hydrate's ``id IN (...)`` and path deletes index lookups
# instead of full-table scans (which grow with the corpus).
for col in ("id", "path"):
try:
tbl.create_fts_index("text", replace=True)
tbl.create_scalar_index(col, replace=True)
except Exception: # pragma: no cover
logger.exception("LanceDB FTS index build failed (continuing).")
n = tbl.count_rows()
if n >= _ANN_MIN_ROWS:
try:
nlist = max(1, min(int(4 * math.sqrt(n)), n // 39))
tbl.create_index(
metric="cosine",
vector_column_name="vector",
num_partitions=nlist,
replace=True,
)
self._ann_built = True
except Exception: # pragma: no cover - falls back to brute-force search
logger.exception("LanceDB vector index build failed (brute-force).")
logger.exception(
"LanceDB scalar index on %s failed (scan fallback).", col
)
n = tbl.count_rows()
if n < _ANN_MIN_ROWS:
# Brute-force is exact and fast at this scale; no ANN index to maintain.
self._ann_built = False
return
try:
nlist = max(1, min(int(4 * math.sqrt(n)), n // 39))
tbl.create_index(
metric="cosine",
vector_column_name="vector",
num_partitions=nlist,
replace=True,
)
self._ann_built = True
except Exception: # pragma: no cover - falls back to brute-force search
logger.exception(
"LanceDB vector index build failed (brute-force fallback)."
)
self._ann_built = False

def _vector_index_stats(self, tbl: Any) -> Optional[Tuple[int, int]]:
"""``(indexed_rows, unindexed_rows)`` for the vector ANN index, or None if absent.

The unindexed count is the brute-forced tail; it drives the reindex decision.
Robust to lancedb version differences in the index/stats object shape.
"""
try:
indices = tbl.list_indices()
except Exception: # pragma: no cover - older/newer API shape
return None
name: Optional[str] = None
for idx in indices:
cols = (
getattr(idx, "columns", None)
or getattr(idx, "column_names", None)
or []
)
if "vector" in cols:
name = getattr(idx, "name", None) or getattr(idx, "index_name", None)
break
if name is None:
return None
try:
stats = tbl.index_stats(name)
except Exception: # pragma: no cover
return None
indexed = int(getattr(stats, "num_indexed_rows", 0) or 0)
unindexed = int(getattr(stats, "num_unindexed_rows", 0) or 0)
return indexed, unindexed

def _refresh_ann_state(self) -> None:
"""Set ``_ann_built`` from what is actually on disk (called on open)."""
if _CHUNKS not in self._db.table_names():
self._ann_built = False
return
self._ann_built = (
self._vector_index_stats(self._db.open_table(_CHUNKS)) is not None
)

@property
def index_kind(self) -> str:
Expand Down
37 changes: 37 additions & 0 deletions tests/test_lance_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,43 @@ def test_distinct_and_fts_sanitization(tmp_path):
assert st.lexical_search("", 5) == [] # empty query


def test_maybe_reindex_builds_ann_on_incremental_tail(tmp_path):
"""An incrementally-grown corpus must end up on the ANN index, not brute-force.

Regression: incremental writes only flushed, so rows piled into an unindexed tail
that every vector query brute-forced (sub-50ms retrieval -> hundreds of ms). A pass
over the reindex threshold should (re)build the index and drain the tail.
"""
from coderag.store.lance_store import _ANN_MIN_ROWS

st, prov = _store(tmp_path)
# Index past the ANN minimum without ever calling optimize() (a watcher session).
n_files = (_ANN_MIN_ROWS // 4) + 5
for f in range(n_files):
chunks = [
_chunk(f"def fn_{f}_{i}(): return {i}", f"fn_{f}_{i}") for i in range(4)
]
_add(st, prov, f"m{f}.py", chunks)
st.flush()

# No ANN index yet -> brute-force.
assert st.index_kind == "lancedb"
assert st._vector_index_stats(st._db.open_table("chunks")) is None

assert st.maybe_reindex() is True
assert st.index_kind == "lancedb-ann"
indexed, unindexed = st._vector_index_stats(st._db.open_table("chunks"))
assert indexed == st.total_chunks() and unindexed == 0

# No new rows -> cheap no-op, index stays.
assert st.maybe_reindex() is False
assert st.index_kind == "lancedb-ann"

# State is recovered from disk on reopen (not just in-memory).
reopened = LanceStore(tmp_path / "store", prov.dim)
assert reopened.index_kind == "lancedb-ann"


def test_clear_empties_store(tmp_path):
st, prov = _store(tmp_path)
_add(st, prov, "a.py", [_chunk("a")])
Expand Down
Loading