Skip to content

[Improve] Gremlin Task Large Result Chunking #3071

Description

@hllqkb

[Improve] Gremlin Task Large Result Chunking

Before submit

  • I have confirmed and searched that there are no similar problems in the historical issue and documents

Environment


Problem Description

When a Gremlin async task returns large results (e.g., a g.V().valueMap() query on a graph with millions of vertices), the entire result set is JSON-serialized, compressed, and stored as a single vertex property (~task_result). This has three compounding bottlenecks:

Bottleneck Code location Current limit
Result item count GremlinJob.TASK_RESULTS_MAX_SIZE 800,000 (hardcoded to Query.DEFAULT_CAPACITY)
Compressed storage size CoreOptions.TASK_RESULT_SIZE_LIMIT 16 MB (default)
Single property maximum BytesBuffer.BYTES_LEN_MAX 10 MB

Note: The 800,000 item count limit is orthogonal to chunking — it limits the in-memory ArrayList during Gremlin execution and is not addressed by this issue. Chunking addresses the storage/serialization bottlenecks below it.

Current behavior (GremlinJob.execute):

List<Object> results = new ArrayList<>();
while (traversal.hasNext()) {
    results.add(traversal.next());    // all in memory, up to 800K entries
    checkResultsSize(results);
}
return results;  // full list -> HugeTask.set() -> JSON serialize -> compress -> single ~task_result

Actual problems:

  1. A query returning 500K vertices with 10 properties each produces a ~50MB JSON string. JsonUtil.toJson(result) serializes it all in memory. After compression, checkPropertySize in asArray() checks against TASK_RESULT_SIZE_LIMIT (16MB default). If the compressed result exceeds this, a LimitExceedException is thrown and the task fails.

  2. On GET /tasks/{id}?with_result=true, the entire ~task_result blob is decompressed and deserialized. For a 50MB result, this takes seconds and can cause timeout on slow connections, especially with the RocksDB backend where the blob is read through multiple serialization layers.

  3. Users cannot incrementally consume large results. The only option is to increase task.result_size_limit (up to 1GB) and accept the memory/performance cost.

Expected behavior:

  1. Large task results are stored in configurable-size chunks (default 1 MB), each stored as a separate vertex property (~task_result_0, ~task_result_1, ...).
  2. The REST API supports paginated retrieval: GET /tasks/{id}?with_result=true&page=0&page_size=1000.
  3. Small results (< chunk threshold) continue to use the existing single-property path — fully backward compatible.

Root Cause Analysis

The root cause is in HugeTask.asArray() (line 568) and HugeTask.set() (line 372):

// HugeTask.set() - result is set as a single string
protected void set(V v) {
    String result = JsonUtil.toJson(v);          // full JSON in one string
    checkPropertySize(result, P.RESULT);         // 16MB check
    this.result = result;
    super.set(v);
}

// HugeTask.asArray() - stored as a single vertex property
if (this.result != null) {
    byte[] bytes = StringEncoding.compress(this.result);  // single blob
    checkPropertySize(bytes.length, P.RESULT);
    list.add(P.RESULT);
    list.add(bytes);
}

The task vertex property model already supports multiple properties (it uses a List<Object> key-value pair list). The limitation is purely in the serialization code — there is no splitting logic. PR #3060 added asArrayWithoutResult() for the metadata-only path but left the result storage unchanged.


Proposed Solution

Phase 1: Chunked Storage

When this.result (the JSON string, already fully in memory) exceeds a configurable threshold task.result_chunk_size (default: 1 MB), the compressed bytes are split into chunks and stored as ~task_result_0, ~task_result_1, ... instead of a single ~task_result. An independent metadata property ~task_result_chunk_count stores the number of chunks.

Splitting strategy — byte-level in Phase 1-2:
In Phase 1-2 the full JSON string (this.result) already exists in memory. We first compress it, then split the compressed byte array at chunk boundaries. This is safe because decompression is order-preserving — concatenating all chunk byte arrays and decompressing as one yields the original JSON. JSON-element-level splitting is deferred to Phase 4 (streaming write), where the full JSON string won't exist yet and splitting must happen at parse time.

Storage model:

Small result (<= 1 MB compressed):
  ~task_result = <compressed full result>              // UNCHANGED

Large result (> 1 MB compressed):
  ~task_result_0 = <compressed chunk 0>                // NEW
  ~task_result_1 = <compressed chunk 1>
  ~task_result_2 = <compressed chunk 2>
  ~task_result_chunk_count = "3"                       // NEW — dedicated marker

Phase 2: Paginated API

Add page and page_size query parameters to GET /tasks/{id}:

GET /tasks/{id}?with_result=true&page=0&page_size=1000

Response includes pagination metadata (page, page_size, total). When page_size is absent, the full result is returned (reassembled from chunks if needed) — backward compatible.

Pagination is implemented as stateless parameters passed through asMap() — no mutable transient fields on HugeTask. This keeps HugeTask immutable after construction.

Phase 3: Validation

Manual end-to-end verification and full regression test suite run.

Phase 4 (Future): Streaming Write

Modify GremlinJob.execute() to write results in batches to avoid holding the full list in memory. Deferred to keep this task scoped. At that point, JSON-element-level chunk splitting will be needed since the full JSON string won't be available in memory.

Key design constraints:

  1. Backward compatibility: existing single-property tasks continue to work
  2. Cross-backend: all changes at the vertex property abstraction level — works across RocksDB, MySQL, PostgreSQL, Cassandra, HBase
  3. Byte-level chunking in Phase 1-2 (safe for compressed+decompressed round-trip); JSON-level chunking deferred to Phase 4 when streaming writes require it
  4. HugeTask remains immutable — pagination state flows through method parameters, not transient fields
  5. Chunk count stored as independent property (~task_result_chunk_count) to avoid traversal-order ambiguity

Files to Modify

File LOC Changes
hugegraph-core/.../task/HugeTask.java 873 Chunked property read/write, reassembly, pagination via asMap() params
hugegraph-api/.../api/job/TaskAPI.java 222 page, page_size query params, new response format
hugegraph-core/.../config/CoreOptions.java 742 New config: task.result_chunk_size (1 MB default)
hugegraph-test/.../core/TaskCoreTest.java 816 Unit tests: chunked storage, reassembly, pagination, backward compat
hugegraph-test/.../api/TaskApiTest.java 189 API integration tests for pagination

Impact

BEFORE - Large Gremlin task result (> 16MB compressed):
  T=0   GremlinJob loads all results into ArrayList
  T=1   JsonUtil.toJson(result) produces ~50MB JSON string
  T=2   StringEncoding.compress(result) → 16MB+ compressed blob
  T=3   checkPropertySize(bytes.length, P.RESULT) → LimitExceedException
  T=4   Task status: FAILED / "Task result size exceeded limit"
  User sees: 500 Internal Server Error

AFTER - Large Gremlin task result (> 16MB compressed):
  T=0   GremlinJob loads results (unchanged for Phase 1-3)
  T=1   JsonUtil.toJson(result) succeeds
  T=2   StringEncoding.compress(result) → large compressed blob
  T=3   HugeTask.asArray() splits compressed bytes into 1MB chunks
  T=4   Stored as ~task_result_0 ... ~task_result_N + ~task_result_chunk_count
  User sees: GET /tasks/{id}?page=0&page_size=1000 -> first 1000 items

Related

Issue/PR Description
#3060 Added with_result=false + metadata-only task reads (this task builds on it)
#3049 Made serializer buffer capacity configurable (orthogonal)
#2764 Good First Issue list (self-proposed task model)

Implementation Plan & Progress

Implementation Plan
Breaking the work into independently reviewable and testable chunks. Each chunk has its own test, can be reviewed in isolation, and builds on the previous one.


Chunk 1 — Add task.result_chunk_size config option

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java

Problem: No configuration exists to control result chunking behavior. The system always serializes and stores the entire task result as a single vertex property regardless of size.

Fix: Add TASK_RESULT_CHUNK_SIZE static option with default 1048576 (1 MB), range [0, BytesBuffer.BYTES_LEN_MAX] (up to 10 MB — must stay within single-property byte limit since each chunk is stored as an individual vertex property). Value 0 disables chunking entirely (preserves current behavior). Register the option in registerOptions().

public static final ConfigOption<Long> TASK_RESULT_CHUNK_SIZE =
    ConfigOption.builder("task.result_chunk_size")
                .description("Max size in bytes per result chunk. " +
                             "0 disables chunking. Default 1MB. " +
                             "Max is BYTES_LEN_MAX (10MB) since each " +
                             "chunk is a single vertex property.")
                .range(0L, BytesBuffer.BYTES_LEN_MAX)
                .defaultValue(Bytes.MB)
                .build();

Test: Verify option is registered and default value is 1 MB. Verify range validation rejects negative values and values > 10 MB.


Chunk 2 — Add property name helpers and chunk count key to HugeTask

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Problem: There is no infrastructure for naming, detecting, or counting chunked properties. All result properties are currently accessed via a single P.RESULT key.

Fix: Add static helpers and a new property constant:

  • chunkKey(int index) — returns "~task_result_" + index for chunk properties; returns P.RESULT ("~task_result") for index < 0 (the legacy key)
  • isChunkedProperty(String key) — returns true if key matches ~task_result_ followed by one or more digits
  • P.RESULT_CHUNK_COUNT — new property constant "~task_result_chunk_count", a dedicated metadata property storing the number of chunks (independent from data chunks — avoids traversal-order ambiguity)

No behavior change — these are pure helpers used by subsequent chunks.

Test: Verify key generation: chunkKey(0)"~task_result_0", chunkKey(-1)"~task_result". Verify isChunkedProperty("~task_result_0")true, isChunkedProperty("~task_result")false, isChunkedProperty("~task_name")false.


Chunk 3 — Implement chunked write in HugeTask.asArray()

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Problem: HugeTask.asArray() (line 568) always writes the entire compressed result as a single ~task_result vertex property, with no mechanism to split large results.

Fix: After compressing this.result into byte[] bytes, check if chunking is enabled (task.result_chunk_size > 0) and the compressed size exceeds the threshold. If so, split the compressed byte array at chunk-size boundaries (byte-level split — this is safe because decompression of the concatenated bytes yields the original JSON). Write each chunk as ~task_result_N, then write ~task_result_chunk_count as an independent marker property.

If chunking is disabled or the result is below the threshold, the original single-property path runs unchanged. Note: do NOT call checkPropertySize on individual chunks — each chunk is guaranteed ≤ task.result_chunk_sizeBYTES_LEN_MAX.

if (this.result != null) {
    byte[] bytes = StringEncoding.compress(this.result);
    long chunkSize = this.config.get(CoreOptions.TASK_RESULT_CHUNK_SIZE);

    if (chunkSize > 0 && bytes.length > chunkSize) {
        // Chunked path — byte-level split of compressed data
        int chunkCount = (int) Math.ceil((double) bytes.length / chunkSize);
        for (int i = 0; i < chunkCount; i++) {
            int start = i * (int) chunkSize;
            int end = Math.min(start + (int) chunkSize, bytes.length);
            list.add(chunkKey(i));
            list.add(Arrays.copyOfRange(bytes, start, end));
        }
        list.add(P.RESULT_CHUNK_COUNT);
        list.add(chunkCount);
    } else {
        // Original single-property path
        checkPropertySize(bytes.length, P.RESULT);
        list.add(P.RESULT);
        list.add(bytes);
    }
}

Test: Create a task with result string > 1 MB. Verify vertex has ~task_result_0 through ~task_result_N properties plus ~task_result_chunk_count. Verify small result (< 1 MB) still uses single ~task_result property.


Chunk 4 — Implement chunked read in HugeTask.property()

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Problem: HugeTask.property() (line 451-504) only handles the single P.RESULT key. Chunk property keys (~task_result_N) fall through to the default branch which throws AssertionError("Unsupported key: " + key). There is no logic to detect, buffer, and reassemble chunked properties when loading a task from the backend.

Fix: Add chunk buffering infrastructure and modify the property() method:

  1. Add two instance fields: private List<Blob> resultChunks (buffers chunk data with index) and private int resultChunkCount = -1 (expected count from marker).
  2. In property(String key, Object value):
    • When isChunkedProperty(key) is true: parse index from key, buffer the (index, Blob) pair into resultChunks. Do NOT throw.
    • When key.equals(P.RESULT_CHUNK_COUNT): record resultChunkCount = (int) value.
    • After buffering, if resultChunks.size() == resultChunkCount: sort by index, decompress each chunk, concatenate, set this.result = fullJsonString.
    • When key.equals(P.RESULT): handle as before (legacy single-property path — also sets this.result).
    • All other keys: delegate to existing switch-case handling (TYPE, NAME, STATUS, etc.).
// New instance fields
private List<Blob> resultChunks;
private int resultChunkCount = -1;

// Modified property() method — case for chunk properties:
if (isChunkedProperty(key)) {
    if (this.resultChunks == null) {
        this.resultChunks = new ArrayList<>();
    }
    this.resultChunks.add((Blob) value);  // store with chunk index encoded in key
    if (this.resultChunkCount > 0 &&
        this.resultChunks.size() == this.resultChunkCount) {
        // All chunks received — reassemble
        this.result = reassembleResult(this.resultChunks);
    }
    return;  // do NOT fall through to switch/default
}
if (key.equals(P.RESULT_CHUNK_COUNT)) {
    this.resultChunkCount = (int) value;
    if (this.resultChunks != null &&
        this.resultChunks.size() == this.resultChunkCount) {
        this.result = reassembleResult(this.resultChunks);
    }
    return;
}
// ... existing P.RESULT case unchanged ...
// ... existing switch/default for other keys unchanged ...

The fromVertex() method (line 731-755) iterates over all vertex properties and calls property() for each. With this fix, chunk properties are collected and reassembled automatically during iteration.

Test: Verify chunked task can be loaded from backend — this.result is correctly reassembled. Verify legacy single-property task still loads correctly. Verify partial chunk load (e.g., task vertex with chunk_count but missing some chunks) does not produce corrupted result.


Chunk 5 — Update HugeTask.asMap() for chunked/legacy compatibility

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Problem: HugeTask.asMap() (line 652-704) calls this.result to get the stored result string. This path must work identically whether the underlying storage is chunked or legacy single-property.

Fix: No code change needed in asMap() if Chunk 4 correctly restores this.result during property() processing. The asMap() method's existing this.result access will return the full reassembled JSON string. Key verification: confirm asMap(true, false) (the with_result=false path from #3060) returns metadata without result for chunked tasks.

Test: Verify asMap(true, true) returns full result for chunked task. Verify asMap(true, false) returns metadata without result for chunked task. Verify asMap() with legacy single-property task still works.


Chunk 6 — Add unit tests for chunked storage

File: hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java

Problem: No test coverage for chunked result storage, reassembly, or backward compatibility.

Fix: Add test methods:

  • testTaskResultChunked() — create task with result JSON string > 1 MB. Verify task vertex stores ~task_result_0, ~task_result_1, ... plus ~task_result_chunk_count. Verify task.result() returns the full reassembled result.
  • testTaskResultSmall() — create task with result < 1 MB. Verify task vertex stores single ~task_result property (backward compat). Verify task.result() returns correct value.
  • testTaskResultChunkReassembly() — create task with known result, verify reassembled result equals original exactly (byte-level).
  • testTaskResultBackwardCompat() — directly construct a task vertex with legacy single ~task_result property. Load it. Verify task.result() works.
  • testTaskResultChunkDisabled() — set task.result_chunk_size to 0. Create large task. Verify single-property path used.

Test data: Generate result strings of known sizes using a helper method that creates a JSON array with N elements, e.g., ["result-item-00000", "result-item-00001", ...].


Chunk 7 — Add pagination logic to HugeTask.asMap()

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeTask.java

Problem: HugeTask has no pagination support. The entire result is always returned.

Fix: Add pagination through method parameters (no mutable transient fields — keeps HugeTask immutable):

public synchronized Map<String, Object> asMap(boolean withDetails,
                                               boolean withResult,
                                               int page, int pageSize) {
    // ... existing code to build map ...
    if (withResult && this.result != null) {
        Object parsed = JsonUtil.fromJson(this.result, Object.class);
        if (page >= 0 && pageSize > 0 && parsed instanceof List) {
            List<?> list = (List<?>) parsed;
            int from = page * pageSize;
            int to = Math.min(from + pageSize, list.size());
            map.put(Hidden.unHide(P.RESULT), list.subList(from, to));
            Map<String, Object> pagination = new HashMap<>();
            pagination.put("page", page);
            pagination.put("page_size", pageSize);
            pagination.put("total", list.size());
            map.put("pagination", pagination);
        } else {
            map.put(Hidden.unHide(P.RESULT), parsed);
        }
    }
    return map;
}

Update the existing overloads to call through with page=-1, pageSize=-1:

public Map<String, Object> asMap() {
    return this.asMap(true, true, -1, -1);
}
public synchronized Map<String, Object> asMap(boolean withDetails) {
    return this.asMap(withDetails, true, -1, -1);
}
public synchronized Map<String, Object> asMap(boolean withDetails, boolean withResult) {
    return this.asMap(withDetails, withResult, -1, -1);
}

Test: Verify pagination with known list size: page=0, pageSize=10 returns first 10 items. page=2, pageSize=10 returns items 20-29. Verify metadata shows correct total. Verify page=-1 returns all items (no pagination key in response).


Chunk 8 — Add page and page_size query params to TaskAPI.get()

File: hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/job/TaskAPI.java

Problem: GET /tasks/{id} has no pagination query parameters.

Fix: Add two new @QueryParam annotations to the existing get() method:

@QueryParam("page") @DefaultValue("-1") int page,
@QueryParam("page_size") @DefaultValue("-1") int pageSize

Pass them directly to asMap() (no transient field setters — clean stateless flow):

@GET
@Timed
@Path("{id}")
@Produces(APPLICATION_JSON_WITH_CHARSET)
public Map<String, Object> get(@Context GraphManager manager,
                               @PathParam("graphspace") String graphSpace,
                               @PathParam("graph") String graph,
                               @PathParam("id") long id,
                               @DefaultValue("true") @QueryParam("with_result")
                               boolean withResult,
                               @DefaultValue("-1") @QueryParam("page") int page,
                               @DefaultValue("-1") @QueryParam("page_size")
                               int pageSize) {
    LOG.debug("Graph [{}] get task: {}", graph, id);
    TaskScheduler scheduler = graph(manager, graphSpace, graph)
            .taskScheduler();
    return scheduler.task(IdGenerator.of(id), withResult)
                    .asMap(true, withResult, page, pageSize);
}

Test: Verify GET /tasks/{id}?with_result=true&page=0&page_size=10 returns first page with pagination metadata. Verify GET /tasks/{id}?with_result=true (no pagination params) returns full result (no pagination key). Verify GET /tasks/{id}?with_result=false&page=0&page_size=10 does not include result (metadata-only always takes precedence).


Chunk 9 — Simplify TaskScheduler (no new overload needed)

File: hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskScheduler.java + StandardTaskScheduler.java

Problem: With Chunk 7's design passing pagination through asMap() parameters (not transient fields), no new scheduler overload is needed — the existing task(Id id, boolean withResult) remains sufficient.

Fix: No code changes needed in TaskScheduler or StandardTaskScheduler. Pagination is purely an API-layer concern handled upstream in TaskAPI.get().

Rationale: The scheduler's job is to load the task from the store and return the object. Pagination is a presentation concern — it should live in asMap() and the REST layer, not in the scheduler interface.

Test: Verify existing scheduler tests pass. Verify the full call chain TaskAPI.get() → scheduler.task() → task.asMap(page, pageSize) works end-to-end.


Chunk 10 — Add API integration tests for pagination

File: hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/TaskApiTest.java

Problem: No test coverage for paginated result retrieval via REST API.

Fix: Add test methods:

  • testGetWithPagination() — create a Gremlin task that returns 100 items. Call GET /tasks/{id}?with_result=true&page=0&page_size=30. Assert 30 items returned. Call page=3&page_size=30. Assert 10 items returned (last page has remainder).
  • testGetPaginationMetadata() — create task with known result. Verify response includes pagination object with correct total, page, page_size.
  • testGetWithoutPagination() — create task with pagination-capable result. Call without page/page_size params. Assert full result returned (backward compat — no pagination key present).
  • testGetInvalidPage() — call with page=-1&page_size=10. Verify full result returned (graceful fallback to full result).

Chunk 11 — Manual verification: chunked storage

Manual test procedure (no automated test — verifies the full end-to-end storage path):

  1. Start HugeGraph server with RocksDB backend
  2. Create a schema and insert test data (e.g., 50K vertices with labels)
  3. Submit a Gremlin async task: g.V().hasLabel("test").valueMap()
  4. Wait for task to SUCCESS
  5. Inspect task vertex properties via GET /graphs/hugegraph/graph/vertices/{task_id} (raw vertex API or debug log)
  6. Verify ~task_result_0, ~task_result_1, ... ~task_result_N exist (if > 1 MB) or single ~task_result exists (if ≤ 1 MB)
  7. Verify ~task_result_chunk_count = N + 1 (if chunked)
  8. Run GET /tasks/{id}?with_result=true and verify the full result equals the query result

Chunk 12 — Manual verification: paginated REST API

Manual test procedure:

  1. Use the task from Chunk 11 (with chunked result)
  2. GET /tasks/{id}?with_result=true&page=0&page_size=100 → verify first 100 items
  3. GET /tasks/{id}?with_result=true&page=1&page_size=100 → verify items 100-199 (different from page 0)
  4. GET /tasks/{id}?with_result=true&page=999&page_size=100 → verify empty array returned with correct total
  5. GET /tasks/{id}?with_result=true (no pagination) → verify full result, no pagination key in response
  6. Verify each page response includes pagination: { page, page_size, total }

Chunk 13 — Run full test suite

Commands:

# Core tests (task framework)
mvn test -pl hugegraph-server/hugegraph-test -am -P core-test,rocksdb

# API tests (REST endpoints)
mvn test -pl hugegraph-server/hugegraph-test -am -P api-test,rocksdb

# Full test suite
mvn test -pl hugegraph-server/hugegraph-test -am -P core-test,api-test,rocksdb

Expected: All existing tests pass. New tests from Chunk 6 and Chunk 10 pass. No regressions in unrelated tests.

Metadata

Metadata

Assignees

No one assigned

    Labels

    gremlinTinkerPop gremlinimprovementGeneral improvement

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions