From 0edfb4990b18e58f7f87dde040a31adcd4eab02d Mon Sep 17 00:00:00 2001 From: Thanatat Tamtan Date: Sat, 13 Jun 2026 13:23:57 +0700 Subject: [PATCH] fix: classify permanent ingest failures; cut a per-flush allocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three hardening changes from the perf/reliability review. 1. HTTP failure taxonomy (the important one). flush() treated EVERY non-200 as retryable, so retryFlush(false) retried forever. A single misconfig — a wrong index (404), an expired token (401), or a schema-rejected batch (400/422) — parked a worker in the infinite-retry loop, and with the default 2 workers a couple of poison batches froze all ingest and blocked every Ingest caller. flush() now treats 400/401/403/404/409/422 as permanent: it settles tracked items with a new *IngestError{Reason: ReasonServer}, invokes OnDiscard for fire-and-forget items, and drops the batch so the worker keeps draining. 5xx, 408, 425, 429, 413 and transport errors stay retryable (413 keeps its auto-reduce path). Tracked callers now get a definitive verdict instead of an ambiguous context timeout. 2. Drop a per-flush allocation. flush() built a fresh `encoded` slice (~48KB + a full copy at the default batch size) every round-trip just to replay the settle list. Removed: on HTTP 200 we settle over `batch` directly, which is identical because encode-failure and fire-and-forget items have ack==nil so settle(nil) is a no-op for them. 3. Recover a 413-reduced batch size on the ticker, not only on a later successful flush. flush() never runs on an empty buffer, so after a transient 413 spike followed by quiet traffic the batch could stay shrunk (down to 10%) indefinitely. The reset now also runs in the worker's ticker branch, so it fires within ~one maxDelay of the window expiring. Fire-and-forget batching, ordering, the 413 split, and the IngestSync at-least-once / exactly-once-settle invariants are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- ingest_reliability_test.go | 172 +++++++++++++++++++++++++++++++++++++ quickwit.go | 80 +++++++++++++---- 2 files changed, 237 insertions(+), 15 deletions(-) create mode 100644 ingest_reliability_test.go diff --git a/ingest_reliability_test.go b/ingest_reliability_test.go new file mode 100644 index 0000000..c3945db --- /dev/null +++ b/ingest_reliability_test.go @@ -0,0 +1,172 @@ +package quickwit_test + +import ( + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/moonrhythm/quickwit" +) + +// Core: a permanent 4xx is reported as ReasonServer promptly, not retried until +// the context deadline. +func TestIngestSync_PermanentStatusReturnsServer(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusBadRequest) // 400: permanent + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + defer c.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + start := time.Now() + err := c.IngestSync(ctx, map[string]any{"index": 0}) + elapsed := time.Since(start) + + var ie *quickwit.IngestError + if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer { + t.Fatalf("err = %v, want *IngestError{ReasonServer}", err) + } + if elapsed > 2*time.Second { + t.Errorf("took %v, want a prompt permanent verdict (not a ctx timeout)", elapsed) + } +} + +// Core: a permanent rejection must not wedge the worker — a second call still +// gets a verdict rather than blocking behind an infinite retry of the first. +func TestIngestSync_PermanentStatusDoesNotWedgeWorker(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusNotFound) // 404: permanent (e.g. wrong index) + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) // a single worker: if the first call wedges it, the second hangs + defer c.Close() + + for i := 0; i < 2; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + err := c.IngestSync(ctx, map[string]any{"index": i}) + cancel() + var ie *quickwit.IngestError + if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer { + t.Fatalf("call %d: err = %v, want *IngestError{ReasonServer}", i, err) + } + } +} + +// Core: a 5xx stays retryable — it must NOT be treated as a permanent rejection. +func TestIngestSync_ServerErrorIsRetriedNotPermanent(t *testing.T) { + var attempts atomic.Int64 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + if attempts.Add(1) < 3 { + w.WriteHeader(http.StatusInternalServerError) // transient + return + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1) + defer c.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := c.IngestSync(ctx, map[string]any{"index": 0}); err != nil { + t.Fatalf("IngestSync returned %v, want nil after retries", err) + } + if n := attempts.Load(); n < 3 { + t.Errorf("server saw %d attempts, want >= 3 (two 500s then success)", n) + } +} + +// Core: fire-and-forget items on a permanent rejection are surfaced via +// OnDiscard and do not wedge the worker; Close returns promptly. +func TestIngest_PermanentStatusDiscardsFireAndForget(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusUnprocessableEntity) // 422: permanent + })) + defer server.Close() + + const numItems = 5 + var discarded atomic.Int64 + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1) + c.OnDiscard(func(any) { discarded.Add(1) }) + + for i := 0; i < numItems; i++ { + c.Ingest(map[string]any{"index": i}) + } + + done := make(chan struct{}) + go func() { c.Close(); close(done) }() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Close did not return — worker wedged on permanent rejection") + } + + if n := discarded.Load(); n != numItems { + t.Errorf("OnDiscard fired %d times, want %d", n, numItems) + } +} + +// Core: removing the per-flush "encoded" slice must not change worker-side +// encode-failure handling — a poison fire-and-forget value is still discarded +// while the good value alongside it is delivered. +func TestIngest_WorkerEncodeFailureStillDiscarded(t *testing.T) { + var mu sync.Mutex + var received []int + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mu.Lock() + received = append(received, parseIndices(body)...) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + var discarded atomic.Int64 + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1000) // both items ride in one batch + c.SetMaxDelay(10 * time.Second) // flushed by Close + c.OnDiscard(func(any) { discarded.Add(1) }) + + c.Ingest( + map[string]any{"index": 0}, + map[string]any{"bad": make(chan int)}, // unencodable + map[string]any{"index": 2}, + ) + c.Close() + + mu.Lock() + defer mu.Unlock() + if len(received) != 2 || received[0] != 0 || received[1] != 2 { + t.Errorf("received = %v, want [0 2]", received) + } + if n := discarded.Load(); n != 1 { + t.Errorf("OnDiscard fired %d times, want 1 (the poison value)", n) + } +} diff --git a/quickwit.go b/quickwit.go index 5cde4f3..0a42074 100644 --- a/quickwit.go +++ b/quickwit.go @@ -44,6 +44,12 @@ const ( // ReasonClosed: the client was closed before the item could be accepted. // Transient; Nack so another instance handles it. ReasonClosed + // ReasonServer: the server permanently rejected the batch with a 4xx that + // retrying cannot fix (e.g. 400/422 bad document, 401/403 auth, 404 wrong + // index, 409 conflict). Inspect the cause: a bad document should be + // dead-lettered and Acked; a misconfiguration must be fixed (Nacking would + // redeliver forever). 5xx and 413 are not this — they stay retryable. + ReasonServer ) func (r DiscardReason) String() string { @@ -54,11 +60,31 @@ func (r DiscardReason) String() string { return "buffer_full" case ReasonClosed: return "closed" + case ReasonServer: + return "server" default: return "unknown" } } +// permanentIngestStatus reports whether an HTTP status from the ingest endpoint +// is a permanent rejection that retrying the same batch cannot fix. 5xx, 408, +// 425, 429 and 413 are deliberately excluded — they stay retryable (413 has its +// own auto-reduce path). +func permanentIngestStatus(code int) bool { + switch code { + case http.StatusBadRequest, // 400 + http.StatusUnauthorized, // 401 + http.StatusForbidden, // 403 + http.StatusNotFound, // 404 + http.StatusConflict, // 409 + http.StatusUnprocessableEntity: // 422 + return true + default: + return false + } +} + // IngestError is returned by IngestSync and IngestReceipt.Wait when an item was // dropped by the client before the server durably accepted it. A wrapped // context error (not an *IngestError) instead means "not confirmed": the item @@ -534,15 +560,15 @@ func (c *Client) loop() { buf.Reset() - // encoded holds the items written into the body, to settle on HTTP 200. // encodeFailures can only ever hold fire-and-forget items (ack == nil): // tracked items carry a pre-encoded raw line, so they cannot fail here. - encoded := make([]ingestItem, 0, len(batch)) + // On HTTP 200 we settle the whole batch directly — settle(nil) is a no-op + // for the ack==nil encode failures — so no separate "encoded" slice is + // allocated per flush. var encodeFailures []ingestItem for _, it := range batch { if it.raw != nil { buf.Write(it.raw) - encoded = append(encoded, it) continue } if err := jsonEnc.Encode(it.data); err != nil { @@ -550,7 +576,6 @@ func (c *Client) loop() { encodeFailures = append(encodeFailures, it) continue } - encoded = append(encoded, it) } // All items were unencodable — discard them and report success so the @@ -622,28 +647,38 @@ func (c *Client) loop() { "resetAfter", resetBatchSizeAfter.Format(time.RFC3339), ) } + return false + } + + // Permanent rejection: retrying the same batch cannot succeed, so + // settle/discard it and report it handled. Otherwise the worker would + // loop forever in retryFlush and, with the default 2 workers, a couple + // of poison batches would freeze all ingest. + if permanentIngestStatus(resp.StatusCode) { + err := &IngestError{ + Reason: ReasonServer, + Err: fmt.Errorf("quickwit: ingest rejected with status %s", resp.Status), + } + for _, it := range batch { + c.discardItem(it, err) + } + return true } + // Retryable (5xx, 408, 425, 429, transport errors handled above). return false } - // HTTP 200 is the only durable-acceptance point: settle every tracked - // item that made it into the body (no-op for fire-and-forget items), - // then discard items that failed encoding. - for _, it := range encoded { + // HTTP 200 is the only durable-acceptance point: settle every item in the + // batch (settle is a no-op for fire-and-forget and encode-failure items, + // which have ack == nil), then discard items that failed encoding. + for _, it := range batch { it.ack.settle(nil) } for _, it := range encodeFailures { c.invokeOnDiscard(it.data) } - if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) { - beforeSize := batchSize - batchSize = c.getBatchSize() - resetBatchSizeAfter = time.Time{} - slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize) - } - return true } @@ -749,6 +784,20 @@ func (c *Client) loop() { } } + // maybeResetBatchSize restores the batch size once the post-413 reduction + // window has elapsed. It is driven by the ticker rather than the flush + // success path so it fires even when traffic goes quiet — an idle buffer + // never calls flush, so a success-only reset could leave the batch + // permanently shrunk after a transient 413 spike. + maybeResetBatchSize := func() { + if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) { + beforeSize := batchSize + batchSize = c.getBatchSize() + resetBatchSizeAfter = time.Time{} + slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize) + } + } + ticker := time.NewTicker(c.getMaxDelay()) defer ticker.Stop() @@ -763,6 +812,7 @@ func (c *Client) loop() { for { select { case <-ticker.C: + maybeResetBatchSize() flushOversize() if len(buffer) == 0 { hasTracked = false