Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/src/org/labkey/api/exp/Lsid.java
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ static public String namespaceLikeString(String namespace)
return "urn:lsid:%:" + namespace + ".%:%";
}

static public String namespaceFilter(String columnName, String namespace)
static public SQLFragment namespaceFilter(Enum<?> column, String namespace)
{
return columnName + " LIKE '" + namespaceLikeString(namespace) + "'";
return new SQLFragment().appendIdentifier(column.name()).append(" LIKE ?").add(namespaceLikeString(namespace));
}

/**
Expand Down
19 changes: 17 additions & 2 deletions api/src/org/labkey/api/exp/api/ExperimentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,23 @@ enum DataTypeForExclusion
List<? extends ExpRun> getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol);

List<? extends ExpRun> getExpRuns(Container container, @Nullable ExpProtocol parentProtocol, @Nullable ExpProtocol childProtocol, @NotNull Predicate<ExpRun> filterFn);

List<? extends ExpRun> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container);

/**
* @param filterSQL optional additional WHERE predicates; callers doing keyset pagination should include
* {@code ER.RowId > minRowId} here
* @param limit max rows to return; pass {@code Table.ALL_ROWS} (-1) for no limit
* @return up to {@code limit} ExpRuns in {@code container} matching {@code filterSQL}, ordered by RowId
*/
List<? extends ExpRun> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container, int limit);

/**
* @param modifiedSince optional upper-exclusive Modified cutoff; pass {@code null} to return all batches
* @param minRowId keyset cursor — only batches with RowId &gt; minRowId are returned; pass 0 for the first page
* @param limit max rows to return
* @return up to {@code limit} assay batches for {@code batchProtocol} in {@code container} with
* RowId &gt; minRowId (and Modified &gt; modifiedSince when non-null), ordered by RowId
*/
List<? extends ExpExperiment> getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol, @Nullable Date modifiedSince, long minRowId, int limit);

List<? extends ExpRun> getExpRunsForJobId(long jobId);

Expand Down
20 changes: 15 additions & 5 deletions api/src/org/labkey/api/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public interface SearchService extends SearchMXBean
Logger _log = LogHelper.getLogger(SearchService.class, "Full text search service");

long DEFAULT_FILE_SIZE_LIMIT = 100L; // 100 MB
int INDEXING_LIMIT = 1_000;

/**
* Returns the max file size indexed
Expand Down Expand Up @@ -494,12 +495,21 @@ public String normalizeHref(Path contextPath, Container c)
interface DocumentProvider
{
/**
* Enumerate documents for full-text search. Unless it's known there will be a small number of documents
* added to the queue, add Runnable to the IndexTask that adds the Resources from the container to the queue.
* If there are potentially many documents for a container, add resources in batches of 1,000 or so to avoid
* a huge memory footprint.
* Enumerate documents for full-text search indexing. Do NOT fetch an unbounded result set into memory.
*
* @param modifiedSince when null, do a full reindex; otherwise incremental (either modified > modifiedSince, or modified > lastIndexed)
* <p><em>Pattern 1 — recursive requeue</em> (preferred when the underlying table supports keyset pagination).
* Fetch at most {@link SearchService#INDEXING_LIMIT} rows, process them, then re-enqueue the next batch
* only if the batch was full. This keeps the ResultSet closed between batches and interleaves with other
* queue work. See {@code ExperimentServiceImpl.indexMaterials()} and
* {@code AssayManager.indexAssayRuns()} for examples.</p>
*
* <p><em>Pattern 2 — forEachBatch + per-batch runnable</em> (simpler when using {@code TableSelector}).
* Stream rows in batches of {@link SearchService#INDEXING_LIMIT} and wrap each batch in a
* {@code queue.addRunnable()} so indexing is deferred. See
* {@code InventoryManager.indexLocations()} and {@code NotebookManager.indexNotebooks()}
* for examples.</p>
*
* @param modifiedSince when null, do a full reindex; otherwise incremental (either modified &gt; modifiedSince, or modified &gt; lastIndexed)
*/
void enumerateDocuments(TaskIndexingQueue adder, @Nullable Date modifiedSince);

Expand Down
49 changes: 34 additions & 15 deletions assay/src/org/labkey/assay/AssayManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.labkey.assay;

import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -710,33 +711,51 @@ public void indexAssayBatches(SearchService.TaskIndexingQueue queue, @Nullable D
for (ExpProtocol protocol : getAssayProtocols(queue.getContainer()))
{
if (shouldIndexProtocolBatches(protocol))
indexAssayBatches(queue, protocol, modifiedSince);
queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, 0));
}
}

private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince)
private void indexAssayBatches(SearchService.TaskIndexingQueue queue, ExpProtocol protocol,
@Nullable Date modifiedSince, long minRowId)
{
if (shouldIndexProtocolBatches(protocol))
{
for (ExpExperiment batch : protocol.getBatches(queue.getContainer()))
{
if (modifiedSince == null || modifiedSince.before(batch.getModified()))
indexAssayBatch(queue, batch);
}
}
List<? extends ExpExperiment> batches = ExperimentService.get().getExpBatches(
queue.getContainer(), protocol, modifiedSince, minRowId, SearchService.INDEXING_LIMIT);

MutableLong maxRowIdProcessed = new MutableLong(minRowId);
batches.forEach(b -> {
indexAssayBatch(queue, b);
maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), b.getRowId()));
});

if (batches.size() == SearchService.INDEXING_LIMIT)
queue.addRunnable((q) -> indexAssayBatches(q, protocol, modifiedSince, maxRowIdProcessed.longValue()));
}

public void indexAssayRuns(SearchService.TaskIndexingQueue queue, @Nullable Date modifiedSince)
{
for (ExpProtocol protocol : getAssayProtocols(queue.getContainer()))
indexAssayRuns(queue, protocol, modifiedSince);
queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, 0));
}

private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol, @Nullable Date modifiedSince)
private void indexAssayRuns(SearchService.TaskIndexingQueue queue, ExpProtocol protocol,
@Nullable Date modifiedSince, long minRowId)
{
ExperimentService.get().getExpRuns(queue.getContainer(), protocol, null, run ->
modifiedSince == null || modifiedSince.before(run.getModified())
).forEach(r -> indexAssayRun(queue, r));
SQLFragment filterSQL = new SQLFragment("ER.ProtocolLSID = ? AND ER.RowId > ?")
.add(protocol.getLSID())
.add(minRowId);
if (modifiedSince != null)
filterSQL.append(" AND ER.Modified > ?").add(modifiedSince);

List<? extends ExpRun> runs = ExperimentService.get().getExpRuns(filterSQL, _ -> true, queue.getContainer(), SearchService.INDEXING_LIMIT);

MutableLong maxRowIdProcessed = new MutableLong(minRowId);
runs.forEach(r -> {
indexAssayRun(queue, r);
maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), r.getRowId()));
});

if (runs.size() == SearchService.INDEXING_LIMIT)
queue.addRunnable((q) -> indexAssayRuns(q, protocol, modifiedSince, maxRowIdProcessed.longValue()));
}

@Override
Expand Down
51 changes: 34 additions & 17 deletions experiment/src/org/labkey/experiment/api/ExperimentServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public List<ExpRunImpl> getExpRuns(Container container, @Nullable ExpProtocol pa
sql.add(childProtocol.getLSID());
}

return getExpRuns(sql, filterFn, container);
return getExpRuns(sql, filterFn, container, Table.ALL_ROWS);
}

@Override
Expand All @@ -582,24 +582,43 @@ public boolean hasExpRuns(Container container, @NotNull Predicate<ExpRun> filter
}

@Override
public List<ExpRunImpl> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container)
public List<ExpRunImpl> getExpRuns(@Nullable SQLFragment filterSQL, @NotNull Predicate<ExpRun> filterFn, @NotNull Container container, int limit)
{
SQLFragment sql = new SQLFragment(" SELECT ER.* "
+ " FROM exp.ExperimentRun ER "
+ " WHERE ER.Container = ? ");
SQLFragment sql = new SQLFragment("SELECT ER.* FROM exp.ExperimentRun ER WHERE ER.Container = ?");
sql.add(container.getId());

if (null != filterSQL && !filterSQL.isEmpty())
sql.append(" AND " ).append(filterSQL);

sql.append(" ORDER BY ER.RowId ");
sql.append(" AND ").append(filterSQL);
sql.append(" ORDER BY ER.RowId");
if (limit > 0)
{
sql = getSchema().getSqlDialect().limitRows(sql, limit);
}

try (Stream<ExperimentRun> runs = new SqlSelector(getSchema(), sql).setJdbcCaching(false).uncachedStream(ExperimentRun.class))
{
return runs.map(ExpRunImpl::new).filter(filterFn).toList();
}
}

@Override
public List<? extends ExpExperiment> getExpBatches(@NotNull Container container, @NotNull ExpProtocol batchProtocol,
@Nullable Date modifiedSince, long minRowId, int limit)
{
SQLFragment sql = new SQLFragment("SELECT E.* FROM ").append(getTinfoExperiment(), "E")
.append(" WHERE E.Container = ?").add(container.getId())
.append(" AND E.BatchProtocolId = ?").add(batchProtocol.getRowId())
.append(" AND E.RowId > ?").add(minRowId);
if (modifiedSince != null)
sql.append(" AND E.Modified > ?").add(modifiedSince);
sql.append(" ORDER BY E.RowId");
if (limit > 0)
{
sql = getSchema().getSqlDialect().limitRows(sql, limit);
}

return ExpExperimentImpl.fromExperiments(new SqlSelector(getSchema(), sql).setJdbcCaching(false).getArray(Experiment.class));
}

@Override
public List<ExpRunImpl> getExpRunsForJobId(long jobId)
{
Expand Down Expand Up @@ -745,7 +764,7 @@ public List<ExpDataImpl> getExpDatas(Container container, @Nullable DataType typ
{
SimpleFilter filter = SimpleFilter.createContainerFilter(container);
if (type != null)
filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null);
filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix()));
if (name != null)
filter.addCondition(FieldKey.fromParts(ExpDataTable.Column.Name.name()), name);

Expand All @@ -756,7 +775,7 @@ public List<ExpDataImpl> getOutputDatas(long runRowId, @Nullable DataType type)
{
SimpleFilter filter = new SimpleFilter(FieldKey.fromParts("RunId"), runRowId);
if (type != null)
filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID.name(), type.getNamespacePrefix()), null);
filter.addWhereClause(Lsid.namespaceFilter(ExpDataTable.Column.LSID, type.getNamespacePrefix()));

return getExpDatas(filter);
}
Expand Down Expand Up @@ -1018,8 +1037,6 @@ public List<ExpMaterialImpl> getExpMaterialsByObjectId(ContainerFilter container
return result;
}

private static final int INDEXING_LIMIT = 1_000;

@Override
public void enumerateDocuments(SearchService.TaskIndexingQueue queue, final Date modifiedSince)
{
Expand Down Expand Up @@ -1076,7 +1093,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue
if (!modifiedSQL.isEmpty())
sql.append(" AND ").append(modifiedSQL);
sql.append(" ORDER BY RowId");
sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT);
sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT);
SqlSelector selector = new SqlSelector(getSchema(), sql);
selector.setJdbcCaching(false);
MutableLong maxRowIdProcessed = new MutableLong(minRowId);
Expand All @@ -1089,7 +1106,7 @@ private void indexMaterials(final @NotNull SearchService.TaskIndexingQueue queue
maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expMaterial.getRowId()));
});

if (materials.size() == INDEXING_LIMIT)
if (materials.size() == SearchService.INDEXING_LIMIT)
{
// Requeue for the next batch. This avoids overwhelming the indexer's queue with documents
queue.addRunnable((q) -> indexMaterials(q, modifiedSince, maxRowIdProcessed.longValue()));
Expand All @@ -1114,7 +1131,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina
sql.append(" AND ").append(modifiedSQL);
sql.append(" ORDER BY RowId");

sql = getSchema().getSqlDialect().limitRows(sql, INDEXING_LIMIT);
sql = getSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT);
SqlSelector selector = new SqlSelector(getSchema(), sql);
selector.setJdbcCaching(false);
MutableLong maxRowIdProcessed = new MutableLong(minRowId);
Expand All @@ -1127,7 +1144,7 @@ public void indexData(final @NotNull SearchService.TaskIndexingQueue queue, fina
maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), expData.getRowId()));
});

if (data.size() == INDEXING_LIMIT)
if (data.size() == SearchService.INDEXING_LIMIT)
{
// Requeue for the next batch. This avoids overwhelming the indexer's queue with documents
queue.addRunnable((q) -> indexData(q, modifiedSince, maxRowIdProcessed.longValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.labkey.experiment.api;

import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Strings;
import org.apache.commons.math3.util.Precision;
Expand Down Expand Up @@ -346,11 +347,11 @@ public void indexSampleType(ExpSampleType sampleType, SearchService.TaskIndexing
impl.index(q, null);
}

indexSampleTypeMaterials(sampleType, q);
indexSampleTypeMaterials(sampleType, q, 0);
});
}

private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue)
private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.TaskIndexingQueue queue, long minRowId)
{
// Index all ExpMaterial that have never been indexed OR where either the ExpSampleType definition or ExpMaterial itself has changed since last indexed
SQLFragment sql = new SQLFragment("SELECT m.* FROM ")
Expand All @@ -359,17 +360,28 @@ private void indexSampleTypeMaterials(ExpSampleType sampleType, SearchService.Ta
.append(ExperimentServiceImpl.get().getTinfoMaterialIndexed(), "mi")
.append(" ON m.RowId = mi.MaterialId WHERE m.LSID NOT LIKE ").appendValue("%:" + StudyService.SPECIMEN_NAMESPACE_PREFIX + "%", getExpSchema().getSqlDialect())
.append(" AND m.cpasType = ?").add(sampleType.getLSID())
.append(" AND m.RowId > ?").add(minRowId)
.append(" AND (mi.lastIndexed IS NULL OR mi.lastIndexed < ? OR (m.modified IS NOT NULL AND mi.lastIndexed < m.modified))")
.append(" ORDER BY m.RowId") // Issue 51263: order by RowId to reduce deadlock
.add(sampleType.getModified());

new SqlSelector(getExpSchema().getScope(), sql).forEachBatch(Material.class, 1000, batch -> {
for (Material m : batch)
{
ExpMaterialImpl impl = new ExpMaterialImpl(m);
impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/);
}
sql = getExpSchema().getSqlDialect().limitRows(sql, SearchService.INDEXING_LIMIT);
SqlSelector selector = new SqlSelector(getExpSchema().getScope(), sql);
selector.setJdbcCaching(false);
MutableLong maxRowIdProcessed = new MutableLong(minRowId);

// Work in modest block sizes and fetch as a list so we don't keep the ResultSet open, which could lock the tables
List<Material> materials = selector.getArrayList(Material.class);
materials.forEach(m -> {
ExpMaterialImpl impl = new ExpMaterialImpl(m);
impl.index(queue, null /* null tableInfo since samples may belong to multiple containers*/);
maxRowIdProcessed.setValue(Math.max(maxRowIdProcessed.longValue(), impl.getRowId()));
});

if (materials.size() == SearchService.INDEXING_LIMIT)
{
// Requeue for the next batch. This avoids overwhelming the indexer's queue with documents
queue.addRunnable((q) -> indexSampleTypeMaterials(sampleType, q, maxRowIdProcessed.longValue()));
}
}


Expand Down