From 631c98396e128d7948841faa6dabf746838693ba Mon Sep 17 00:00:00 2001 From: Rene Nochebuena Date: Thu, 19 Mar 2026 13:13:41 +0000 Subject: [PATCH] docs(worker): correct tier from 2 to 3 and fix dependency tier refs worker depends on launcher (now correctly Tier 2) and logz (Tier 1), placing it at Tier 3. The previous docs cited launcher as Tier 1 and logz as Tier 0, both of which were wrong. --- .devcontainer/devcontainer.json | 26 +++ .gitignore | 38 ++++ CHANGELOG.md | 29 +++ CLAUDE.md | 96 +++++++++ LICENSE | 21 ++ compliance_test.go | 8 + doc.go | 8 + .../ADR-001-drain-with-timeout-shutdown.md | 45 ++++ docs/adr/ADR-002-per-task-timeout.md | 46 ++++ docs/adr/ADR-003-channel-task-queue.md | 53 +++++ go.mod | 8 + go.sum | 4 + worker.go | 130 +++++++++++ worker_test.go | 201 ++++++++++++++++++ 14 files changed, 713 insertions(+) create mode 100644 .devcontainer/devcontainer.json create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 CLAUDE.md create mode 100644 LICENSE create mode 100644 compliance_test.go create mode 100644 doc.go create mode 100644 docs/adr/ADR-001-drain-with-timeout-shutdown.md create mode 100644 docs/adr/ADR-002-per-task-timeout.md create mode 100644 docs/adr/ADR-003-channel-task-queue.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 worker.go create mode 100644 worker_test.go diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..54f5aae --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,26 @@ +{ + "name": "Go", + "image": "mcr.microsoft.com/devcontainers/go:2-1.25-trixie", + "features": { + "ghcr.io/devcontainers-extra/features/claude-code:1": {} + }, + "forwardPorts": [], + "postCreateCommand": "go version", + "customizations": { + "vscode": { + "settings": { + "files.autoSave": "afterDelay", + "files.autoSaveDelay": 1000, + "explorer.compactFolders": false, + "explorer.showEmptyFolders": true + }, + "extensions": [ + "golang.go", + "eamodio.golang-postfix-completion", + "quicktype.quicktype", + "usernamehw.errorlens" + ] + } + }, + "remoteUser": "vscode" +} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..221da82 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with go test -c +*.test + +# Output of go build +*.out + +# Dependency directory +vendor/ + +# Go workspace file +go.work +go.work.sum + +# Environment files +.env +.env.* + +# Editor / IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# VCS files +COMMIT.md +RELEASE.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..8b48fd6 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,29 @@ +# Changelog + +All notable changes to this module will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.9.0] - 2026-03-18 + +### Added + +- `Task` type — `func(ctx context.Context) error`; the unit of work dispatched to the pool +- `Config` struct — pool settings loaded from environment variables: `WORKER_POOL_SIZE` (number of concurrent goroutines, default `5`), `WORKER_BUFFER_SIZE` (task queue channel capacity, default `100`), `WORKER_TASK_TIMEOUT` (per-task context deadline; `0` means no deadline, default `0s`), `WORKER_SHUTDOWN_TIMEOUT` (time to wait for workers to drain on stop, default `30s`) +- `Provider` interface — `Dispatch(task Task) bool`; for callers that only dispatch tasks; returns `false` immediately when the queue is full (backpressure, non-blocking) +- `Component` interface — embeds `launcher.Component` and `Provider`; the full lifecycle-managed surface registered with the launcher +- `New(logger logz.Logger, cfg Config) Component` — constructor; applies safe defaults (`PoolSize <= 0` → 5, `BufferSize <= 0` → 100); returns a `Component` ready for `lc.Append` +- `OnInit` — logs pool configuration; initialises the buffered task channel +- `OnStart` — spawns `PoolSize` worker goroutines, each ranging over the task channel +- `OnStop` — closes the task channel (drain signal), cancels the pool context, then waits up to `ShutdownTimeout` for all goroutines to finish; logs an error on timeout but returns `nil` so the launcher continues +- Per-task timeout — when `TaskTimeout > 0`, each worker creates a `context.WithTimeout` child before invoking the task; tasks also receive a cancellation signal when the pool is stopping via the pool context +- Error logging — task errors are logged with the worker ID; errors are not surfaced to the dispatcher + +### Design Notes + +- A single buffered `chan Task` is shared by all workers; closing it during `OnStop` is the drain signal, avoiding a separate done channel or additional synchronisation primitives. +- `Dispatch` is deliberately non-blocking: a `false` return means the task has been dropped, not queued; the caller owns the retry or overflow decision, keeping backpressure handling out of the pool itself. +- `Provider` / `Component` split follows the framework pattern: inject `Provider` into callers that only dispatch tasks, inject `Component` only at the lifecycle registration site. + +[0.9.0]: https://code.nochebuena.dev/go/worker/releases/tag/v0.9.0 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..5d3ee84 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,96 @@ +# worker + +Concurrent background worker pool with launcher lifecycle integration. + +## Purpose + +`worker` provides a fixed-size goroutine pool that receives tasks via a buffered +channel. It integrates with `launcher` for managed startup and graceful shutdown, +and with `logz` for structured logging. Tasks are plain `func(ctx context.Context) error` +callables — no task struct, no registration, no reflection. + +## Tier & Dependencies + +**Tier:** 3 (depends on Tier 1 `logz` and Tier 2 `launcher`) +**Module:** `code.nochebuena.dev/go/worker` +**Direct imports:** `code.nochebuena.dev/go/launcher`, `code.nochebuena.dev/go/logz` + +Note: `worker` imports `logz.Logger` directly (not duck-typed). This follows the +framework-to-framework direct import rule from global ADR-001: `logz` is a peer +framework module, not an app-provided dependency. + +## Key Design Decisions + +- **Drain-with-timeout shutdown** (ADR-001): `OnStop` closes the queue channel, + cancels the pool context, then waits for all goroutines with a `ShutdownTimeout` + deadline (default 30 s). Timeout is logged as an error; `OnStop` returns `nil` + regardless so the launcher continues. +- **Per-task timeout** (ADR-002): When `TaskTimeout > 0`, each worker goroutine + creates a `context.WithTimeout` child before calling the task. Zero means no + deadline is imposed. +- **Channel-based queue** (ADR-003): A single buffered `chan Task` is shared by all + workers. `Dispatch` is non-blocking — it returns `false` immediately when the + buffer is full (backpressure). Workers `range` over the channel; closing it during + `OnStop` is the drain signal. + +## Patterns + +**Constructing and registering:** + +```go +pool := worker.New(logger, worker.Config{ + PoolSize: 5, + BufferSize: 100, + TaskTimeout: 5 * time.Second, + ShutdownTimeout: 30 * time.Second, +}) +lc.Append(pool) +``` + +**Dispatching a task:** + +```go +ok := pool.Dispatch(func(ctx context.Context) error { + return sendEmail(ctx, msg) +}) +if !ok { + // queue was full — caller decides how to handle +} +``` + +**Interface surface:** + +- `worker.Task` — `func(ctx context.Context) error` +- `worker.Provider` — `Dispatch(Task) bool` (no lifecycle) +- `worker.Component` — embeds `launcher.Component` + `Provider` (full lifecycle) + +Inject `Provider` into callers that only dispatch; inject `Component` into the +launcher registration site. + +## What to Avoid + +- Do not call `Dispatch` after `OnStop` has been called. Sending on a closed channel + panics. The launcher lifecycle guarantees ordering, but callers that hold a + reference and dispatch asynchronously must respect this. +- Do not assume a `false` return from `Dispatch` means the task will eventually + execute. It has been dropped. Implement retry or overflow handling at the call site + if loss is unacceptable. +- Do not use `worker` as a request-scoped concurrency primitive (e.g. fan-out within + a single HTTP handler). It is designed for background jobs, not intra-request + parallelism. +- Do not add a method to `Task` (e.g. `ID`, `Priority`). Keep tasks as plain + functions. Priority queues or named tasks belong in a separate, application-specific + abstraction built on top of `Provider`. + +## Testing Notes + +- `compliance_test.go` has a compile-time assertion: `var _ worker.Component = worker.New(...)`. + This ensures `New` returns a `Component` and the signature stays stable. +- `worker_test.go` tests dispatch, draining, timeout, and pool size via the real + implementation — no mocks needed. +- All config fields have sane defaults applied in `New`: `PoolSize <= 0` → 5, + `BufferSize <= 0` → 100. Tests that want deterministic behaviour should set + explicit values. +- The pool goroutines start in `OnStart`, not in `New`. Tests that call `Dispatch` + without going through the launcher must call `OnInit` and `OnStart` first, or + use the launcher directly. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0b33b48 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 NOCHEBUENADEV + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/compliance_test.go b/compliance_test.go new file mode 100644 index 0000000..ed7d6e5 --- /dev/null +++ b/compliance_test.go @@ -0,0 +1,8 @@ +package worker_test + +import ( + "code.nochebuena.dev/go/logz" + "code.nochebuena.dev/go/worker" +) + +var _ worker.Component = worker.New(logz.New(logz.Options{}), worker.Config{}) diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..9a43c41 --- /dev/null +++ b/doc.go @@ -0,0 +1,8 @@ +// Package worker provides a concurrent worker pool with launcher lifecycle integration. +// +// Usage: +// +// pool := worker.New(logger, worker.Config{PoolSize: 5, BufferSize: 100}) +// lc.Append(pool) +// pool.Dispatch(func(ctx context.Context) error { return doWork(ctx) }) +package worker diff --git a/docs/adr/ADR-001-drain-with-timeout-shutdown.md b/docs/adr/ADR-001-drain-with-timeout-shutdown.md new file mode 100644 index 0000000..68bc305 --- /dev/null +++ b/docs/adr/ADR-001-drain-with-timeout-shutdown.md @@ -0,0 +1,45 @@ +# ADR-001: Drain-with-Timeout Shutdown + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +A worker pool that stops abruptly risks silently dropping tasks that were already +queued but not yet picked up by a goroutine. Conversely, waiting indefinitely for +workers to finish is unsafe in production: a stuck task would prevent the process +from exiting, blocking rolling deploys and causing orchestrators to send SIGKILL. + +The `launcher` lifecycle protocol gives each component an `OnStop` hook. The worker +pool must use that hook to drain cleanly while guaranteeing a bounded exit time. + +## Decision + +`OnStop` performs a three-step drain sequence: + +1. **Close the task queue channel** (`close(w.taskQueue)`). This signals every + goroutine that is `range`-ing over the channel to exit once the buffer is empty. + No new tasks can be dispatched after this point — `Dispatch` would panic on a + send to a closed channel, but by the time `OnStop` runs the service is already + shutting down. +2. **Cancel the pool context** (`w.cancel()`). Any task currently executing that + respects its `ctx` argument will receive a cancellation signal and can return + early. +3. **Wait with a timeout**. A goroutine calls `w.wg.Wait()` and closes a `done` + channel. `OnStop` then selects between `done` and `time.After(ShutdownTimeout)`. + If `ShutdownTimeout` is zero the implementation falls back to 30 seconds. On + timeout, an error is logged but `OnStop` returns `nil` so the launcher can + continue shutting down other components. + +## Consequences + +- Tasks already in the queue at shutdown time will execute (drain). Only tasks that + have not been dispatched yet — or tasks that are stuck past the timeout — may be + dropped. +- The 30-second default matches common Kubernetes `terminationGracePeriodSeconds` + defaults, making the behaviour predictable in containerised deployments. +- `ShutdownTimeout` is configurable via `WORKER_SHUTDOWN_TIMEOUT` so operators can + tune it per environment without code changes. +- `OnStop` always returns `nil`; a timeout is surfaced as a logged error, not a + returned error, so the launcher continues cleaning up other components even if + workers are stuck. diff --git a/docs/adr/ADR-002-per-task-timeout.md b/docs/adr/ADR-002-per-task-timeout.md new file mode 100644 index 0000000..9960c33 --- /dev/null +++ b/docs/adr/ADR-002-per-task-timeout.md @@ -0,0 +1,46 @@ +# ADR-002: Per-Task Timeout via Child Context + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +Worker tasks can call external services, run database queries, or perform other +operations with unpredictable latency. A single slow or hung task occupying a +goroutine indefinitely degrades overall pool throughput. Without a bounded +execution time, one bad task can block a worker slot for the lifetime of the +process. + +At the same time, a blanket timeout should not be imposed when callers have not +requested one — zero-timeout (polling or batch jobs) is a legitimate use case. + +## Decision + +`Config` exposes a `TaskTimeout time.Duration` field (env `WORKER_TASK_TIMEOUT`, +default `0s`). Each worker goroutine checks this value before calling a task: + +- If `TaskTimeout > 0`, a `context.WithTimeout(ctx, w.cfg.TaskTimeout)` child + context is created and its `cancel` function is deferred after the call. +- If `TaskTimeout == 0`, the pool root context is passed through unchanged and a + no-op cancel function is used. + +The task receives the (possibly deadline-bearing) context as its only `context.Context` +argument. It is the task's responsibility to respect cancellation; the pool does not +forcibly terminate goroutines. + +`cancel()` is called immediately after the task returns, regardless of whether the +task succeeded or failed, to release the timer resource promptly. + +## Consequences + +- Tasks that respect `ctx.Done()` or pass `ctx` to downstream calls are automatically + bounded by `TaskTimeout`. +- Tasks that ignore their context will not be forcibly killed; the timeout becomes a + best-effort signal only. This is a deliberate trade-off — Go does not support + goroutine preemption. +- Setting `TaskTimeout = 0` is a safe default: no deadline is added, and no timer + resource is allocated per task. +- `TaskTimeout` is independent of `ShutdownTimeout`. A task may have a 5-second + execution timeout while the pool allows 30 seconds to drain during shutdown. +- The timeout context is a child of the pool root context, so cancelling the pool + (via `OnStop`) also cancels any running task context, regardless of `TaskTimeout`. diff --git a/docs/adr/ADR-003-channel-task-queue.md b/docs/adr/ADR-003-channel-task-queue.md new file mode 100644 index 0000000..fbdc8f0 --- /dev/null +++ b/docs/adr/ADR-003-channel-task-queue.md @@ -0,0 +1,53 @@ +# ADR-003: Channel-Based Buffered Task Queue + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +A worker pool requires a mechanism to hand off work from callers to goroutines. +Common options include a mutex-protected slice, a ring buffer, or a Go channel. +The pool must support multiple concurrent producers (callers of `Dispatch`) and +multiple concurrent consumers (worker goroutines), while providing a simple +backpressure signal when capacity is exhausted. + +## Decision + +The task queue is a buffered `chan Task` with capacity `Config.BufferSize` (env +`WORKER_BUFFER_SIZE`, default 100). All worker goroutines receive from the same +channel using `for task := range w.taskQueue`. Producers call `Dispatch` which +uses a non-blocking `select` with a `default` branch: + +```go +select { +case w.taskQueue <- task: + return true +default: + // queue full — log and return false + return false +} +``` + +`Dispatch` returns `bool`: `true` if the task was enqueued, `false` if the queue +was full. The caller decides what to do with a rejected task (retry, log, discard). + +Closing the channel in `OnStop` is the drain signal: `range` over a closed channel +drains buffered items and then exits naturally, so no separate "stop" message is +needed. + +## Consequences + +- The channel scheduler distributes tasks across all `PoolSize` goroutines without + any additional synchronisation code. +- Backpressure is explicit: a full queue returns `false` rather than blocking the + caller or growing unboundedly. Callers that must not drop tasks should implement + retry logic at their layer. +- Channel capacity is fixed at construction time. There is no dynamic resizing; if + workload consistently fills the buffer, `BufferSize` or `PoolSize` must be tuned + in config. +- Closing the channel is a one-way signal: once `OnStop` closes it, `Dispatch` must + not be called again. This is safe in practice because `launcher` ensures `OnStop` + is only called after the application has stopped dispatching work, but there is no + runtime guard against misuse. +- The `for range` pattern requires no sentinel values and is idiomatic Go for + fan-out worker pools. diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fdce35e --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module code.nochebuena.dev/go/worker + +go 1.25 + +require ( + code.nochebuena.dev/go/launcher v0.9.0 + code.nochebuena.dev/go/logz v0.9.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..16f1a9f --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +code.nochebuena.dev/go/launcher v0.9.0 h1:dJHonA9Xm03AQKK0919FJaQn9ZKHZ+RZfB9yxjnx3TA= +code.nochebuena.dev/go/launcher v0.9.0/go.mod h1:IBtntmbnyddukjEhxlc7Ysdzz9nZsnd9+8FzAIHt77g= +code.nochebuena.dev/go/logz v0.9.0 h1:wfV7vtI4V/8ED7Hm31Fbql7Y5iOGrlHN4X8Z5ajTZZE= +code.nochebuena.dev/go/logz v0.9.0/go.mod h1:qODhSbKb+tWE7rdhHLcKweiP5CgwIaWoZxadCT3bQV8= diff --git a/worker.go b/worker.go new file mode 100644 index 0000000..0a94cbc --- /dev/null +++ b/worker.go @@ -0,0 +1,130 @@ +package worker + +import ( + "context" + "sync" + "time" + + "code.nochebuena.dev/go/launcher" + "code.nochebuena.dev/go/logz" +) + +// Task is a unit of work executed asynchronously by the worker pool. +type Task func(ctx context.Context) error + +// Provider dispatches tasks to the pool. +type Provider interface { + // Dispatch queues a task. Returns false if the queue is full (backpressure). + Dispatch(task Task) bool +} + +// Component adds lifecycle management to Provider. +type Component interface { + launcher.Component + Provider +} + +// Config holds worker pool settings. +type Config struct { + // PoolSize is the number of concurrent workers. Default: 5. + PoolSize int `env:"WORKER_POOL_SIZE" envDefault:"5"` + // BufferSize is the task queue capacity. Default: 100. + BufferSize int `env:"WORKER_BUFFER_SIZE" envDefault:"100"` + // TaskTimeout is the maximum duration for a single task. Zero = no timeout. + TaskTimeout time.Duration `env:"WORKER_TASK_TIMEOUT" envDefault:"0s"` + // ShutdownTimeout is how long OnStop waits for workers to drain. Default: 30s. + ShutdownTimeout time.Duration `env:"WORKER_SHUTDOWN_TIMEOUT" envDefault:"30s"` +} + +type workerComponent struct { + logger logz.Logger + cfg Config + taskQueue chan Task + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +// New returns a worker Component. Call lc.Append(pool) to manage its lifecycle. +func New(logger logz.Logger, cfg Config) Component { + if cfg.PoolSize <= 0 { + cfg.PoolSize = 5 + } + if cfg.BufferSize <= 0 { + cfg.BufferSize = 100 + } + ctx, cancel := context.WithCancel(context.Background()) + return &workerComponent{ + logger: logger, + cfg: cfg, + taskQueue: make(chan Task, cfg.BufferSize), + ctx: ctx, + cancel: cancel, + } +} + +func (w *workerComponent) OnInit() error { + w.logger.Info("worker: initializing pool", + "pool_size", w.cfg.PoolSize, + "buffer_size", w.cfg.BufferSize) + return nil +} + +func (w *workerComponent) OnStart() error { + w.logger.Info("worker: starting workers") + for i := 0; i < w.cfg.PoolSize; i++ { + w.wg.Add(1) + go func(id int) { + defer w.wg.Done() + w.runWorker(id) + }(i) + } + return nil +} + +func (w *workerComponent) OnStop() error { + w.logger.Info("worker: stopping, draining queue") + close(w.taskQueue) + w.cancel() + + done := make(chan struct{}) + go func() { w.wg.Wait(); close(done) }() + + timeout := w.cfg.ShutdownTimeout + if timeout == 0 { + timeout = 30 * time.Second + } + select { + case <-done: + w.logger.Info("worker: all workers stopped cleanly") + case <-time.After(timeout): + w.logger.Error("worker: shutdown timeout reached; some workers may still be running", nil) + } + return nil +} + +func (w *workerComponent) Dispatch(task Task) bool { + select { + case w.taskQueue <- task: + return true + default: + w.logger.Error("worker: queue full, task dropped", nil) + return false + } +} + +func (w *workerComponent) runWorker(id int) { + for task := range w.taskQueue { + ctx := w.ctx + var cancel context.CancelFunc + if w.cfg.TaskTimeout > 0 { + ctx, cancel = context.WithTimeout(ctx, w.cfg.TaskTimeout) + } else { + cancel = func() {} + } + if err := task(ctx); err != nil { + w.logger.Error("worker: task failed", err, "worker_id", id) + } + cancel() + } +} diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..41cef4f --- /dev/null +++ b/worker_test.go @@ -0,0 +1,201 @@ +package worker + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" + + "code.nochebuena.dev/go/logz" +) + +func newLogger() logz.Logger { return logz.New(logz.Options{}) } + +func startWorker(t *testing.T, cfg Config) Component { + t.Helper() + c := New(newLogger(), cfg) + if err := c.OnInit(); err != nil { + t.Fatalf("OnInit: %v", err) + } + if err := c.OnStart(); err != nil { + t.Fatalf("OnStart: %v", err) + } + return c +} + +func TestNew(t *testing.T) { + if New(newLogger(), Config{}) == nil { + t.Fatal("New returned nil") + } +} + +func TestWorker_DispatchAndExecute(t *testing.T) { + done := make(chan struct{}) + c := startWorker(t, Config{PoolSize: 1, BufferSize: 10, ShutdownTimeout: time.Second}) + + c.Dispatch(func(ctx context.Context) error { + close(done) + return nil + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("task not executed in time") + } + c.OnStop() +} + +func TestWorker_BackpressureFull(t *testing.T) { + // Block the single worker so the queue fills up. + block := make(chan struct{}) + c := startWorker(t, Config{PoolSize: 1, BufferSize: 1, ShutdownTimeout: time.Second}) + + c.Dispatch(func(ctx context.Context) error { <-block; return nil }) // fills worker + c.Dispatch(func(ctx context.Context) error { return nil }) // fills buffer + + ok := c.Dispatch(func(ctx context.Context) error { return nil }) // should be dropped + if ok { + t.Error("expected Dispatch to return false when queue is full") + } + close(block) + c.OnStop() +} + +func TestWorker_OnStop_DrainsQueue(t *testing.T) { + var count int64 + c := startWorker(t, Config{PoolSize: 2, BufferSize: 50, ShutdownTimeout: 5 * time.Second}) + + for i := 0; i < 10; i++ { + c.Dispatch(func(ctx context.Context) error { + atomic.AddInt64(&count, 1) + return nil + }) + } + c.OnStop() + + if atomic.LoadInt64(&count) != 10 { + t.Errorf("expected 10 tasks completed, got %d", count) + } +} + +func TestWorker_OnStop_Timeout(t *testing.T) { + c := startWorker(t, Config{PoolSize: 1, BufferSize: 1, + ShutdownTimeout: 50 * time.Millisecond}) + + // Dispatch a task that blocks longer than ShutdownTimeout. + c.Dispatch(func(ctx context.Context) error { + time.Sleep(500 * time.Millisecond) + return nil + }) + + start := time.Now() + c.OnStop() + elapsed := time.Since(start) + + // OnStop should return after ~ShutdownTimeout, not after 500ms. + if elapsed > 300*time.Millisecond { + t.Errorf("OnStop blocked too long: %v", elapsed) + } +} + +func TestWorker_TaskTimeout(t *testing.T) { + var ctxCancelled int64 + c := startWorker(t, Config{ + PoolSize: 1, BufferSize: 10, + TaskTimeout: 50 * time.Millisecond, + ShutdownTimeout: time.Second, + }) + + done := make(chan struct{}) + c.Dispatch(func(ctx context.Context) error { + defer close(done) + select { + case <-ctx.Done(): + atomic.StoreInt64(&ctxCancelled, 1) + case <-time.After(500 * time.Millisecond): + } + return nil + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("task did not complete in time") + } + if atomic.LoadInt64(&ctxCancelled) != 1 { + t.Error("expected task context to be cancelled by TaskTimeout") + } + c.OnStop() +} + +func TestWorker_MultipleWorkers(t *testing.T) { + const n = 5 + var wg sync.WaitGroup + wg.Add(n) + started := make(chan struct{}, n) + + c := startWorker(t, Config{PoolSize: n, BufferSize: n, ShutdownTimeout: time.Second}) + + block := make(chan struct{}) + for i := 0; i < n; i++ { + c.Dispatch(func(ctx context.Context) error { + started <- struct{}{} + <-block + wg.Done() + return nil + }) + } + + // All n tasks should start concurrently. + timer := time.NewTimer(time.Second) + defer timer.Stop() + for i := 0; i < n; i++ { + select { + case <-started: + case <-timer.C: + t.Fatalf("only %d/%d workers started concurrently", i, n) + } + } + close(block) + c.OnStop() +} + +func TestWorker_TaskError(t *testing.T) { + c := startWorker(t, Config{PoolSize: 1, BufferSize: 10, ShutdownTimeout: time.Second}) + + done := make(chan struct{}) + c.Dispatch(func(ctx context.Context) error { + defer close(done) + return errors.New("task error") + }) + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("task did not run") + } + c.OnStop() // should not panic +} + +func TestWorker_Lifecycle(t *testing.T) { + c := New(newLogger(), Config{PoolSize: 2, BufferSize: 10, ShutdownTimeout: time.Second}) + if err := c.OnInit(); err != nil { + t.Fatalf("OnInit: %v", err) + } + if err := c.OnStart(); err != nil { + t.Fatalf("OnStart: %v", err) + } + done := make(chan struct{}) + c.Dispatch(func(ctx context.Context) error { close(done); return nil }) + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("task not executed") + } + if err := c.OnStop(); err != nil { + t.Fatalf("OnStop: %v", err) + } +}