diff --git a/ingest_reliability_test.go b/ingest_reliability_test.go new file mode 100644 index 0000000..c3945db --- /dev/null +++ b/ingest_reliability_test.go @@ -0,0 +1,172 @@ +package quickwit_test + +import ( + "context" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/moonrhythm/quickwit" +) + +// Core: a permanent 4xx is reported as ReasonServer promptly, not retried until +// the context deadline. +func TestIngestSync_PermanentStatusReturnsServer(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusBadRequest) // 400: permanent + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + defer c.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + start := time.Now() + err := c.IngestSync(ctx, map[string]any{"index": 0}) + elapsed := time.Since(start) + + var ie *quickwit.IngestError + if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer { + t.Fatalf("err = %v, want *IngestError{ReasonServer}", err) + } + if elapsed > 2*time.Second { + t.Errorf("took %v, want a prompt permanent verdict (not a ctx timeout)", elapsed) + } +} + +// Core: a permanent rejection must not wedge the worker — a second call still +// gets a verdict rather than blocking behind an infinite retry of the first. +func TestIngestSync_PermanentStatusDoesNotWedgeWorker(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusNotFound) // 404: permanent (e.g. wrong index) + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) // a single worker: if the first call wedges it, the second hangs + defer c.Close() + + for i := 0; i < 2; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + err := c.IngestSync(ctx, map[string]any{"index": i}) + cancel() + var ie *quickwit.IngestError + if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer { + t.Fatalf("call %d: err = %v, want *IngestError{ReasonServer}", i, err) + } + } +} + +// Core: a 5xx stays retryable — it must NOT be treated as a permanent rejection. +func TestIngestSync_ServerErrorIsRetriedNotPermanent(t *testing.T) { + var attempts atomic.Int64 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + if attempts.Add(1) < 3 { + w.WriteHeader(http.StatusInternalServerError) // transient + return + } + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1) + defer c.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := c.IngestSync(ctx, map[string]any{"index": 0}); err != nil { + t.Fatalf("IngestSync returned %v, want nil after retries", err) + } + if n := attempts.Load(); n < 3 { + t.Errorf("server saw %d attempts, want >= 3 (two 500s then success)", n) + } +} + +// Core: fire-and-forget items on a permanent rejection are surfaced via +// OnDiscard and do not wedge the worker; Close returns promptly. +func TestIngest_PermanentStatusDiscardsFireAndForget(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + io.Copy(io.Discard, r.Body) + w.WriteHeader(http.StatusUnprocessableEntity) // 422: permanent + })) + defer server.Close() + + const numItems = 5 + var discarded atomic.Int64 + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1) + c.OnDiscard(func(any) { discarded.Add(1) }) + + for i := 0; i < numItems; i++ { + c.Ingest(map[string]any{"index": i}) + } + + done := make(chan struct{}) + go func() { c.Close(); close(done) }() + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("Close did not return — worker wedged on permanent rejection") + } + + if n := discarded.Load(); n != numItems { + t.Errorf("OnDiscard fired %d times, want %d", n, numItems) + } +} + +// Core: removing the per-flush "encoded" slice must not change worker-side +// encode-failure handling — a poison fire-and-forget value is still discarded +// while the good value alongside it is delivered. +func TestIngest_WorkerEncodeFailureStillDiscarded(t *testing.T) { + var mu sync.Mutex + var received []int + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + mu.Lock() + received = append(received, parseIndices(body)...) + mu.Unlock() + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + var discarded atomic.Int64 + + c := quickwit.NewClient(server.URL + "/api/v1/test") + c.SetConcurrent(1) + c.SetBatchSize(1000) // both items ride in one batch + c.SetMaxDelay(10 * time.Second) // flushed by Close + c.OnDiscard(func(any) { discarded.Add(1) }) + + c.Ingest( + map[string]any{"index": 0}, + map[string]any{"bad": make(chan int)}, // unencodable + map[string]any{"index": 2}, + ) + c.Close() + + mu.Lock() + defer mu.Unlock() + if len(received) != 2 || received[0] != 0 || received[1] != 2 { + t.Errorf("received = %v, want [0 2]", received) + } + if n := discarded.Load(); n != 1 { + t.Errorf("OnDiscard fired %d times, want 1 (the poison value)", n) + } +} diff --git a/quickwit.go b/quickwit.go index 5cde4f3..0a42074 100644 --- a/quickwit.go +++ b/quickwit.go @@ -44,6 +44,12 @@ const ( // ReasonClosed: the client was closed before the item could be accepted. // Transient; Nack so another instance handles it. ReasonClosed + // ReasonServer: the server permanently rejected the batch with a 4xx that + // retrying cannot fix (e.g. 400/422 bad document, 401/403 auth, 404 wrong + // index, 409 conflict). Inspect the cause: a bad document should be + // dead-lettered and Acked; a misconfiguration must be fixed (Nacking would + // redeliver forever). 5xx and 413 are not this — they stay retryable. + ReasonServer ) func (r DiscardReason) String() string { @@ -54,11 +60,31 @@ func (r DiscardReason) String() string { return "buffer_full" case ReasonClosed: return "closed" + case ReasonServer: + return "server" default: return "unknown" } } +// permanentIngestStatus reports whether an HTTP status from the ingest endpoint +// is a permanent rejection that retrying the same batch cannot fix. 5xx, 408, +// 425, 429 and 413 are deliberately excluded — they stay retryable (413 has its +// own auto-reduce path). +func permanentIngestStatus(code int) bool { + switch code { + case http.StatusBadRequest, // 400 + http.StatusUnauthorized, // 401 + http.StatusForbidden, // 403 + http.StatusNotFound, // 404 + http.StatusConflict, // 409 + http.StatusUnprocessableEntity: // 422 + return true + default: + return false + } +} + // IngestError is returned by IngestSync and IngestReceipt.Wait when an item was // dropped by the client before the server durably accepted it. A wrapped // context error (not an *IngestError) instead means "not confirmed": the item @@ -534,15 +560,15 @@ func (c *Client) loop() { buf.Reset() - // encoded holds the items written into the body, to settle on HTTP 200. // encodeFailures can only ever hold fire-and-forget items (ack == nil): // tracked items carry a pre-encoded raw line, so they cannot fail here. - encoded := make([]ingestItem, 0, len(batch)) + // On HTTP 200 we settle the whole batch directly — settle(nil) is a no-op + // for the ack==nil encode failures — so no separate "encoded" slice is + // allocated per flush. var encodeFailures []ingestItem for _, it := range batch { if it.raw != nil { buf.Write(it.raw) - encoded = append(encoded, it) continue } if err := jsonEnc.Encode(it.data); err != nil { @@ -550,7 +576,6 @@ func (c *Client) loop() { encodeFailures = append(encodeFailures, it) continue } - encoded = append(encoded, it) } // All items were unencodable — discard them and report success so the @@ -622,28 +647,38 @@ func (c *Client) loop() { "resetAfter", resetBatchSizeAfter.Format(time.RFC3339), ) } + return false + } + + // Permanent rejection: retrying the same batch cannot succeed, so + // settle/discard it and report it handled. Otherwise the worker would + // loop forever in retryFlush and, with the default 2 workers, a couple + // of poison batches would freeze all ingest. + if permanentIngestStatus(resp.StatusCode) { + err := &IngestError{ + Reason: ReasonServer, + Err: fmt.Errorf("quickwit: ingest rejected with status %s", resp.Status), + } + for _, it := range batch { + c.discardItem(it, err) + } + return true } + // Retryable (5xx, 408, 425, 429, transport errors handled above). return false } - // HTTP 200 is the only durable-acceptance point: settle every tracked - // item that made it into the body (no-op for fire-and-forget items), - // then discard items that failed encoding. - for _, it := range encoded { + // HTTP 200 is the only durable-acceptance point: settle every item in the + // batch (settle is a no-op for fire-and-forget and encode-failure items, + // which have ack == nil), then discard items that failed encoding. + for _, it := range batch { it.ack.settle(nil) } for _, it := range encodeFailures { c.invokeOnDiscard(it.data) } - if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) { - beforeSize := batchSize - batchSize = c.getBatchSize() - resetBatchSizeAfter = time.Time{} - slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize) - } - return true } @@ -749,6 +784,20 @@ func (c *Client) loop() { } } + // maybeResetBatchSize restores the batch size once the post-413 reduction + // window has elapsed. It is driven by the ticker rather than the flush + // success path so it fires even when traffic goes quiet — an idle buffer + // never calls flush, so a success-only reset could leave the batch + // permanently shrunk after a transient 413 spike. + maybeResetBatchSize := func() { + if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) { + beforeSize := batchSize + batchSize = c.getBatchSize() + resetBatchSizeAfter = time.Time{} + slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize) + } + } + ticker := time.NewTicker(c.getMaxDelay()) defer ticker.Stop() @@ -763,6 +812,7 @@ func (c *Client) loop() { for { select { case <-ticker.C: + maybeResetBatchSize() flushOversize() if len(buffer) == 0 { hasTracked = false