97 lines
3.8 KiB
Markdown
97 lines
3.8 KiB
Markdown
|
|
# worker
|
||
|
|
|
||
|
|
Concurrent background worker pool with launcher lifecycle integration.
|
||
|
|
|
||
|
|
## Purpose
|
||
|
|
|
||
|
|
`worker` provides a fixed-size goroutine pool that receives tasks via a buffered
|
||
|
|
channel. It integrates with `launcher` for managed startup and graceful shutdown,
|
||
|
|
and with `logz` for structured logging. Tasks are plain `func(ctx context.Context) error`
|
||
|
|
callables — no task struct, no registration, no reflection.
|
||
|
|
|
||
|
|
## Tier & Dependencies
|
||
|
|
|
||
|
|
**Tier:** 3 (depends on Tier 1 `logz` and Tier 2 `launcher`)
|
||
|
|
**Module:** `code.nochebuena.dev/go/worker`
|
||
|
|
**Direct imports:** `code.nochebuena.dev/go/launcher`, `code.nochebuena.dev/go/logz`
|
||
|
|
|
||
|
|
Note: `worker` imports `logz.Logger` directly (not duck-typed). This follows the
|
||
|
|
framework-to-framework direct import rule from global ADR-001: `logz` is a peer
|
||
|
|
framework module, not an app-provided dependency.
|
||
|
|
|
||
|
|
## Key Design Decisions
|
||
|
|
|
||
|
|
- **Drain-with-timeout shutdown** (ADR-001): `OnStop` closes the queue channel,
|
||
|
|
cancels the pool context, then waits for all goroutines with a `ShutdownTimeout`
|
||
|
|
deadline (default 30 s). Timeout is logged as an error; `OnStop` returns `nil`
|
||
|
|
regardless so the launcher continues.
|
||
|
|
- **Per-task timeout** (ADR-002): When `TaskTimeout > 0`, each worker goroutine
|
||
|
|
creates a `context.WithTimeout` child before calling the task. Zero means no
|
||
|
|
deadline is imposed.
|
||
|
|
- **Channel-based queue** (ADR-003): A single buffered `chan Task` is shared by all
|
||
|
|
workers. `Dispatch` is non-blocking — it returns `false` immediately when the
|
||
|
|
buffer is full (backpressure). Workers `range` over the channel; closing it during
|
||
|
|
`OnStop` is the drain signal.
|
||
|
|
|
||
|
|
## Patterns
|
||
|
|
|
||
|
|
**Constructing and registering:**
|
||
|
|
|
||
|
|
```go
|
||
|
|
pool := worker.New(logger, worker.Config{
|
||
|
|
PoolSize: 5,
|
||
|
|
BufferSize: 100,
|
||
|
|
TaskTimeout: 5 * time.Second,
|
||
|
|
ShutdownTimeout: 30 * time.Second,
|
||
|
|
})
|
||
|
|
lc.Append(pool)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Dispatching a task:**
|
||
|
|
|
||
|
|
```go
|
||
|
|
ok := pool.Dispatch(func(ctx context.Context) error {
|
||
|
|
return sendEmail(ctx, msg)
|
||
|
|
})
|
||
|
|
if !ok {
|
||
|
|
// queue was full — caller decides how to handle
|
||
|
|
}
|
||
|
|
```
|
||
|
|
|
||
|
|
**Interface surface:**
|
||
|
|
|
||
|
|
- `worker.Task` — `func(ctx context.Context) error`
|
||
|
|
- `worker.Provider` — `Dispatch(Task) bool` (no lifecycle)
|
||
|
|
- `worker.Component` — embeds `launcher.Component` + `Provider` (full lifecycle)
|
||
|
|
|
||
|
|
Inject `Provider` into callers that only dispatch; inject `Component` into the
|
||
|
|
launcher registration site.
|
||
|
|
|
||
|
|
## What to Avoid
|
||
|
|
|
||
|
|
- Do not call `Dispatch` after `OnStop` has been called. Sending on a closed channel
|
||
|
|
panics. The launcher lifecycle guarantees ordering, but callers that hold a
|
||
|
|
reference and dispatch asynchronously must respect this.
|
||
|
|
- Do not assume a `false` return from `Dispatch` means the task will eventually
|
||
|
|
execute. It has been dropped. Implement retry or overflow handling at the call site
|
||
|
|
if loss is unacceptable.
|
||
|
|
- Do not use `worker` as a request-scoped concurrency primitive (e.g. fan-out within
|
||
|
|
a single HTTP handler). It is designed for background jobs, not intra-request
|
||
|
|
parallelism.
|
||
|
|
- Do not add a method to `Task` (e.g. `ID`, `Priority`). Keep tasks as plain
|
||
|
|
functions. Priority queues or named tasks belong in a separate, application-specific
|
||
|
|
abstraction built on top of `Provider`.
|
||
|
|
|
||
|
|
## Testing Notes
|
||
|
|
|
||
|
|
- `compliance_test.go` has a compile-time assertion: `var _ worker.Component = worker.New(...)`.
|
||
|
|
This ensures `New` returns a `Component` and the signature stays stable.
|
||
|
|
- `worker_test.go` tests dispatch, draining, timeout, and pool size via the real
|
||
|
|
implementation — no mocks needed.
|
||
|
|
- All config fields have sane defaults applied in `New`: `PoolSize <= 0` → 5,
|
||
|
|
`BufferSize <= 0` → 100. Tests that want deterministic behaviour should set
|
||
|
|
explicit values.
|
||
|
|
- The pool goroutines start in `OnStart`, not in `New`. Tests that call `Dispatch`
|
||
|
|
without going through the launcher must call `OnInit` and `OnStart` first, or
|
||
|
|
use the launcher directly.
|