Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions ingest_reliability_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package quickwit_test

import (
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/moonrhythm/quickwit"
)

// Core: a permanent 4xx is reported as ReasonServer promptly, not retried until
// the context deadline.
func TestIngestSync_PermanentStatusReturnsServer(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusBadRequest) // 400: permanent
}))
defer server.Close()

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
defer c.Close()

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

start := time.Now()
err := c.IngestSync(ctx, map[string]any{"index": 0})
elapsed := time.Since(start)

var ie *quickwit.IngestError
if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer {
t.Fatalf("err = %v, want *IngestError{ReasonServer}", err)
}
if elapsed > 2*time.Second {
t.Errorf("took %v, want a prompt permanent verdict (not a ctx timeout)", elapsed)
}
}

// Core: a permanent rejection must not wedge the worker — a second call still
// gets a verdict rather than blocking behind an infinite retry of the first.
func TestIngestSync_PermanentStatusDoesNotWedgeWorker(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusNotFound) // 404: permanent (e.g. wrong index)
}))
defer server.Close()

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1) // a single worker: if the first call wedges it, the second hangs
defer c.Close()

for i := 0; i < 2; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
err := c.IngestSync(ctx, map[string]any{"index": i})
cancel()
var ie *quickwit.IngestError
if !errors.As(err, &ie) || ie.Reason != quickwit.ReasonServer {
t.Fatalf("call %d: err = %v, want *IngestError{ReasonServer}", i, err)
}
}
}

// Core: a 5xx stays retryable — it must NOT be treated as a permanent rejection.
func TestIngestSync_ServerErrorIsRetriedNotPermanent(t *testing.T) {
var attempts atomic.Int64
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
if attempts.Add(1) < 3 {
w.WriteHeader(http.StatusInternalServerError) // transient
return
}
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.SetBatchSize(1)
defer c.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := c.IngestSync(ctx, map[string]any{"index": 0}); err != nil {
t.Fatalf("IngestSync returned %v, want nil after retries", err)
}
if n := attempts.Load(); n < 3 {
t.Errorf("server saw %d attempts, want >= 3 (two 500s then success)", n)
}
}

// Core: fire-and-forget items on a permanent rejection are surfaced via
// OnDiscard and do not wedge the worker; Close returns promptly.
func TestIngest_PermanentStatusDiscardsFireAndForget(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusUnprocessableEntity) // 422: permanent
}))
defer server.Close()

const numItems = 5
var discarded atomic.Int64

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.SetBatchSize(1)
c.OnDiscard(func(any) { discarded.Add(1) })

for i := 0; i < numItems; i++ {
c.Ingest(map[string]any{"index": i})
}

done := make(chan struct{})
go func() { c.Close(); close(done) }()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Close did not return — worker wedged on permanent rejection")
}

if n := discarded.Load(); n != numItems {
t.Errorf("OnDiscard fired %d times, want %d", n, numItems)
}
}

// Core: removing the per-flush "encoded" slice must not change worker-side
// encode-failure handling — a poison fire-and-forget value is still discarded
// while the good value alongside it is delivered.
func TestIngest_WorkerEncodeFailureStillDiscarded(t *testing.T) {
var mu sync.Mutex
var received []int

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
mu.Lock()
received = append(received, parseIndices(body)...)
mu.Unlock()
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

var discarded atomic.Int64

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.SetBatchSize(1000) // both items ride in one batch
c.SetMaxDelay(10 * time.Second) // flushed by Close
c.OnDiscard(func(any) { discarded.Add(1) })

c.Ingest(
map[string]any{"index": 0},
map[string]any{"bad": make(chan int)}, // unencodable
map[string]any{"index": 2},
)
c.Close()

mu.Lock()
defer mu.Unlock()
if len(received) != 2 || received[0] != 0 || received[1] != 2 {
t.Errorf("received = %v, want [0 2]", received)
}
if n := discarded.Load(); n != 1 {
t.Errorf("OnDiscard fired %d times, want 1 (the poison value)", n)
}
}
80 changes: 65 additions & 15 deletions quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ const (
// ReasonClosed: the client was closed before the item could be accepted.
// Transient; Nack so another instance handles it.
ReasonClosed
// ReasonServer: the server permanently rejected the batch with a 4xx that
// retrying cannot fix (e.g. 400/422 bad document, 401/403 auth, 404 wrong
// index, 409 conflict). Inspect the cause: a bad document should be
// dead-lettered and Acked; a misconfiguration must be fixed (Nacking would
// redeliver forever). 5xx and 413 are not this — they stay retryable.
ReasonServer
)

func (r DiscardReason) String() string {
Expand All @@ -54,11 +60,31 @@ func (r DiscardReason) String() string {
return "buffer_full"
case ReasonClosed:
return "closed"
case ReasonServer:
return "server"
default:
return "unknown"
}
}

// permanentIngestStatus reports whether an HTTP status from the ingest endpoint
// is a permanent rejection that retrying the same batch cannot fix. 5xx, 408,
// 425, 429 and 413 are deliberately excluded — they stay retryable (413 has its
// own auto-reduce path).
func permanentIngestStatus(code int) bool {
switch code {
case http.StatusBadRequest, // 400
http.StatusUnauthorized, // 401
http.StatusForbidden, // 403
http.StatusNotFound, // 404
http.StatusConflict, // 409
http.StatusUnprocessableEntity: // 422
return true
default:
return false
}
}

// IngestError is returned by IngestSync and IngestReceipt.Wait when an item was
// dropped by the client before the server durably accepted it. A wrapped
// context error (not an *IngestError) instead means "not confirmed": the item
Expand Down Expand Up @@ -534,23 +560,22 @@ func (c *Client) loop() {

buf.Reset()

// encoded holds the items written into the body, to settle on HTTP 200.
// encodeFailures can only ever hold fire-and-forget items (ack == nil):
// tracked items carry a pre-encoded raw line, so they cannot fail here.
encoded := make([]ingestItem, 0, len(batch))
// On HTTP 200 we settle the whole batch directly — settle(nil) is a no-op
// for the ack==nil encode failures — so no separate "encoded" slice is
// allocated per flush.
var encodeFailures []ingestItem
for _, it := range batch {
if it.raw != nil {
buf.Write(it.raw)
encoded = append(encoded, it)
continue
}
if err := jsonEnc.Encode(it.data); err != nil {
slog.Error("quickwit: failed to encode record, discarding", "error", err)
encodeFailures = append(encodeFailures, it)
continue
}
encoded = append(encoded, it)
}

// All items were unencodable — discard them and report success so the
Expand Down Expand Up @@ -622,28 +647,38 @@ func (c *Client) loop() {
"resetAfter", resetBatchSizeAfter.Format(time.RFC3339),
)
}
return false
}

// Permanent rejection: retrying the same batch cannot succeed, so
// settle/discard it and report it handled. Otherwise the worker would
// loop forever in retryFlush and, with the default 2 workers, a couple
// of poison batches would freeze all ingest.
if permanentIngestStatus(resp.StatusCode) {
err := &IngestError{
Reason: ReasonServer,
Err: fmt.Errorf("quickwit: ingest rejected with status %s", resp.Status),
}
for _, it := range batch {
c.discardItem(it, err)
}
return true
}

// Retryable (5xx, 408, 425, 429, transport errors handled above).
return false
}

// HTTP 200 is the only durable-acceptance point: settle every tracked
// item that made it into the body (no-op for fire-and-forget items),
// then discard items that failed encoding.
for _, it := range encoded {
// HTTP 200 is the only durable-acceptance point: settle every item in the
// batch (settle is a no-op for fire-and-forget and encode-failure items,
// which have ack == nil), then discard items that failed encoding.
for _, it := range batch {
it.ack.settle(nil)
}
for _, it := range encodeFailures {
c.invokeOnDiscard(it.data)
}

if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) {
beforeSize := batchSize
batchSize = c.getBatchSize()
resetBatchSizeAfter = time.Time{}
slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize)
}

return true
}

Expand Down Expand Up @@ -749,6 +784,20 @@ func (c *Client) loop() {
}
}

// maybeResetBatchSize restores the batch size once the post-413 reduction
// window has elapsed. It is driven by the ticker rather than the flush
// success path so it fires even when traffic goes quiet — an idle buffer
// never calls flush, so a success-only reset could leave the batch
// permanently shrunk after a transient 413 spike.
maybeResetBatchSize := func() {
if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) {
beforeSize := batchSize
batchSize = c.getBatchSize()
resetBatchSizeAfter = time.Time{}
slog.Info("quickwit: reset batch size", "batchSize", batchSize, "old", beforeSize)
}
}

ticker := time.NewTicker(c.getMaxDelay())
defer ticker.Stop()

Expand All @@ -763,6 +812,7 @@ func (c *Client) loop() {
for {
select {
case <-ticker.C:
maybeResetBatchSize()
flushOversize()
if len(buffer) == 0 {
hasTracked = false
Expand Down
Loading