Skip to content

lib-x/timewheel

Repository files navigation

timewheel

Go Reference Go Report Card

A generic timer wheel for Go.

Requirements

Go 1.25 or later.

Installation

go get github.com/lib-x/timewheel

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/lib-x/timewheel"
)

func main() {
    tw, err := timewheel.New[string](
        100*time.Millisecond,
        60,
        func(msg string) {
            fmt.Println("fired:", msg)
        },
    )
    if err != nil {
        log.Fatal(err)
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if err := tw.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer tw.Close()

    id, err := tw.AddTimer(500*time.Millisecond, "hello")
    if err != nil {
        log.Fatal(err)
    }

    if fireAt, ok := tw.NextFireTime(id); ok {
        fmt.Printf("'hello' fires in %s\n", time.Until(fireAt).Round(time.Millisecond))
    }
}

API

Construction

func New[T any](
    interval   time.Duration,
    slotNum    int,
    defaultJob Job[T],
    opts       ...Option[T],
) (*TimeWheel[T], error)

interval is the tick resolution. Delays shorter than one interval are rounded up to one tick. slotNum controls how many buckets the wheel uses before circle counting handles longer delays.

Lifecycle

func (tw *TimeWheel[T]) Start(ctx context.Context) error
func (tw *TimeWheel[T]) Stop() error
func (tw *TimeWheel[T]) Close() error
func (tw *TimeWheel[T]) Wait()

Lifecycle is explicit:

new -> running -> closed
  • Start(nil) returns ErrNilContext.
  • Start may succeed once.
  • Starting an already running wheel returns ErrRunning.
  • Starting a closed wheel returns ErrClosed.
  • Stop before Start returns ErrNotStarted.
  • Stop is idempotent after the wheel is running or closed.
  • Close is idempotent, stops the wheel, and waits for the event loop and worker pool.
  • After Close returns, PendingTimers is empty and Stats().Pending is zero.
  • Canceling the context passed to Start stops the wheel.

Timer registration requires a running wheel. Add* and RemoveTimer return ErrNotStarted before Start and ErrClosed after shutdown begins.

Timer IDs

type TimerID uint64

All timer APIs use TimerID instead of raw integers.

One-Shot Timers

type Job[T any] func(T)
type JobContext[T any] func(context.Context, T) error

func (tw *TimeWheel[T]) AddTimer(delay time.Duration, data T) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerWithJob(delay time.Duration, data T, job Job[T]) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerWithContextJob(delay time.Duration, data T, job JobContext[T]) (TimerID, error)
func (tw *TimeWheel[T]) AddTimerFunc(delay time.Duration, fn func()) (TimerID, error)

Nil per-timer jobs return ErrNilJob. A nil default job is allowed only when timers provide their own job. If a timer fires without any job, the wheel logs a warning when a logger is configured and removes the timer.

Repeating Timers

type RepeatMode uint8

const (
    FixedRate RepeatMode = iota
    FixedDelay
    SkipIfRunning
)

type RepeatOptions struct {
    Mode RepeatMode
}

func (tw *TimeWheel[T]) AddRepeatingTimer(delay time.Duration, data T, opts RepeatOptions) (TimerID, error)
func (tw *TimeWheel[T]) AddRepeatingTimerWithJob(delay time.Duration, data T, job Job[T], opts RepeatOptions) (TimerID, error)
func (tw *TimeWheel[T]) AddRepeatingTimerWithContextJob(delay time.Duration, data T, job JobContext[T], opts RepeatOptions) (TimerID, error)

Repeat modes:

  • FixedRate: schedules the next fire when the current fire is dispatched. Jobs may overlap.
  • FixedDelay: waits for the previous job to return, then waits the delay. Jobs do not overlap.
  • SkipIfRunning: keeps a fixed-rate cadence but skips a fire if the previous job is still running.

FixedRate and SkipIfRunning stay anchored to the schedule grid: occurrence n is scheduled for start + n*delay, so per-dispatch lateness does not accumulate into long-term drift. When the wheel falls more than one full period behind (for example after a stall), missed periods are skipped and the timer realigns to the next future grid point instead of firing a burst. JobEvent.Lateness reports how far behind an execution started. FixedDelay intentionally re-anchors at job completion time.

The zero-value RepeatOptions{} uses FixedRate.

Removing Timers

func (tw *TimeWheel[T]) RemoveTimer(id TimerID) error

Unknown, already-fired, and already-removed timer IDs are successful no-ops. After RemoveTimer returns nil, the wheel will not dispatch any future not-yet-started execution for that timer.

RemoveTimer does not cancel a job that has already started. Use JobContext if a job needs to observe root wheel shutdown.

Observability

type JobEvent[T any] struct {
    TimerID      TimerID
    Data         T
    StartedAt    time.Time
    FinishedAt   time.Time
    ScheduledFor time.Time
    Lateness     time.Duration
    Duration     time.Duration
    Err          error
    Panic        any
    Dropped      bool
    Skipped      bool
}

type JobObserver[T any] func(JobEvent[T])

func WithJobObserver[T any](observer JobObserver[T]) Option[T]
func WithErrorHandler[T any](h func(recovered any)) Option[T]
func WithLogger[T any](l Logger) Option[T]
func WithClock[T any](clk Clock) Option[T]

JobContext errors are reported through JobEvent.Err. Panics are recovered when an error handler or observer is configured. Without either, a panic keeps normal Go behavior and crashes the program.

WithClock overrides the wheel's time source through the Clock and Ticker interfaces. It exists so tests can drive the wheel deterministically with a fake clock.

Worker Pool

type BackpressurePolicy uint8

const (
    Block BackpressurePolicy = iota
    Drop
    RunInline
)

func WithWorkerPool[T any](workers int, queueSize int, policy BackpressurePolicy) Option[T]

workers <= 0 disables the pool and runs jobs in independent goroutines. queueSize bounds the worker queue when the pool is enabled.

Backpressure policies:

  • Block: wait for queue capacity unless shutdown starts.
  • Drop: record a dropped job and do not run it when the queue is full.
  • RunInline: run the job on the event loop when the queue is full.

RunInline preserves execution but can delay ticks. When a repeating execution is dropped, the timer remains active and schedules its next attempt according to its repeat mode.

Stats

type Stats struct {
    Pending  int64
    Executed int64
    Removed  int64
    Queued   int64
    Running  int64
    Dropped  int64
    Skipped  int64
}

Pending counts timers currently waiting in wheel slots. It does not count jobs waiting in the worker queue or jobs already running.

Inspecting Timers

func (tw *TimeWheel[T]) NextFireTime(id TimerID) (time.Time, bool)
func (tw *TimeWheel[T]) PendingTimers() []TimerInfo
func (tw *TimeWheel[T]) Stats() Stats

NextFireTime and PendingTimers are snapshots. They are estimates based on the wheel state when queried, not hard real-time guarantees. Actual dispatch happens no earlier than the scheduled time and can be delayed by up to one tick plus runtime scheduling jitter.

For FixedDelay timers, there is no pending next fire while the previous job is still running; the next fire is scheduled after that job returns.

Design Notes

Time Wheel Placement

ticks  = ceil(delay / interval)
offset = ticks - 1
circle = offset / slotNum
pos    = (currentPos + offset) % slotNum

The event loop scans one slot on every tick, dispatches due tasks, then advances the wheel pointer.

Timing Guarantees

Dispatch happens no earlier than the scheduled time and can be delayed by up to one tick plus runtime scheduling jitter. The wheel position advances one slot per received tick: if the event loop stalls long enough that the runtime ticker drops ticks (for example under the Block backpressure policy or a slow RunInline job), the wheel falls behind wall-clock time and dispatches late rather than compensating. Grid anchoring keeps repeating timers from accumulating that lag into permanent drift, and JobEvent.Lateness makes it observable.

Concurrency Model

All slot and index mutations are serialized onto the event loop goroutine through an internal command channel; wheel slots are accessed without locks. Add* and RemoveTimer block until the event loop acknowledges the command, so a timer is queryable through NextFireTime as soon as its Add* call returns. Inspection APIs read a shared index guarded by an RWMutex.

Deletion Complexity

The wheel keeps a TimerID -> slot/index location index. RemoveTimer uses the index to find the timer in O(1), then removes it from the slot with swap-and-shrink. When another task is swapped into the removed position, its index is updated immediately.

Core Scope

The core package handles delay, repeat, cancel, execution, and inspection. Cron expressions, persistent scheduling, orchestration, and business-key mapping belong in separate packages layered on top.

Keyed Scheduler

The scheduler subpackage provides a keyed dynamic scheduler built on top of the core time wheel. It keeps cron-like, calendar, and business schedule logic outside the core package by requiring callers to provide the next-run calculation.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/lib-x/timewheel/scheduler"
)

func main() {
    s, err := scheduler.NewScheduler[string, string](
        scheduler.Options[string, string]{
            Next: func(now time.Time, key string, data string) (time.Time, bool, error) {
                return now.Add(time.Minute), true, nil
            },
            Run: func(ctx context.Context, key string, data string) error {
                fmt.Println("run", key, data)
                return nil
            },
        },
        scheduler.WithReschedulePolicy(scheduler.RescheduleAfterFinish),
        scheduler.WithWheel(time.Second, 3600),
    )
    if err != nil {
        log.Fatal(err)
    }

    if err := s.Upsert(scheduler.Item[string, string]{
        Key:  "daily-report",
        Data: "payload",
    }); err != nil {
        log.Fatal(err)
    }

    if err := s.Start(context.Background()); err != nil {
        log.Fatal(err)
    }
    defer s.Close()
}

Scheduler features:

  • Upsert, ReplaceAll, and Remove manage items by key.
  • Snapshot returns pending, running, disabled, and invalid runtime state.
  • NextFunc is the only place that calculates the next execution time.
  • generation tracking prevents stale timers and stale completions from rescheduling removed or replaced items.
  • CancelRunningOnRemove, CancelRunningOnReplace, WaitRunningOnClose, and RunTimeout control running job lifecycle and can be configured with functional options.
  • WithClock injects a fake clock into the scheduler and its wheel for deterministic tests.
  • RescheduleAfterFinish avoids self-overlap, RescheduleBeforeRun supports a fixed cadence, and NoAutoReschedule leaves rescheduling to the caller.

License

MIT

About

a generic, high-performance timer wheel in go

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages