Add Len() int to Provider interface; returns current queue depth for observability and test assertions. Bump launcher and logz to v1.0.1. Go directive bumped to 1.26. API committed as stable.
135 lines
3.3 KiB
Go
135 lines
3.3 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.nochebuena.dev/go/launcher"
|
|
"code.nochebuena.dev/go/logz"
|
|
)
|
|
|
|
// Task is a unit of work executed asynchronously by the worker pool.
|
|
type Task func(ctx context.Context) error
|
|
|
|
// Provider dispatches tasks to the pool.
|
|
type Provider interface {
|
|
// Dispatch queues a task. Returns false if the queue is full (backpressure).
|
|
Dispatch(task Task) bool
|
|
// Len returns the current number of tasks waiting in the queue.
|
|
Len() int
|
|
}
|
|
|
|
// Component adds lifecycle management to Provider.
|
|
type Component interface {
|
|
launcher.Component
|
|
Provider
|
|
}
|
|
|
|
// Config holds worker pool settings.
|
|
type Config struct {
|
|
// PoolSize is the number of concurrent workers. Default: 5.
|
|
PoolSize int `env:"WORKER_POOL_SIZE" envDefault:"5"`
|
|
// BufferSize is the task queue capacity. Default: 100.
|
|
BufferSize int `env:"WORKER_BUFFER_SIZE" envDefault:"100"`
|
|
// TaskTimeout is the maximum duration for a single task. Zero = no timeout.
|
|
TaskTimeout time.Duration `env:"WORKER_TASK_TIMEOUT" envDefault:"0s"`
|
|
// ShutdownTimeout is how long OnStop waits for workers to drain. Default: 30s.
|
|
ShutdownTimeout time.Duration `env:"WORKER_SHUTDOWN_TIMEOUT" envDefault:"30s"`
|
|
}
|
|
|
|
type workerComponent struct {
|
|
logger logz.Logger
|
|
cfg Config
|
|
taskQueue chan Task
|
|
wg sync.WaitGroup
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// New returns a worker Component. Call lc.Append(pool) to manage its lifecycle.
|
|
func New(logger logz.Logger, cfg Config) Component {
|
|
if cfg.PoolSize <= 0 {
|
|
cfg.PoolSize = 5
|
|
}
|
|
if cfg.BufferSize <= 0 {
|
|
cfg.BufferSize = 100
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &workerComponent{
|
|
logger: logger,
|
|
cfg: cfg,
|
|
taskQueue: make(chan Task, cfg.BufferSize),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
func (w *workerComponent) OnInit() error {
|
|
w.logger.Info("worker: initializing pool",
|
|
"pool_size", w.cfg.PoolSize,
|
|
"buffer_size", w.cfg.BufferSize)
|
|
return nil
|
|
}
|
|
|
|
func (w *workerComponent) OnStart() error {
|
|
w.logger.Info("worker: starting workers")
|
|
for i := 0; i < w.cfg.PoolSize; i++ {
|
|
w.wg.Add(1)
|
|
go func(id int) {
|
|
defer w.wg.Done()
|
|
w.runWorker(id)
|
|
}(i)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *workerComponent) OnStop() error {
|
|
w.logger.Info("worker: stopping, draining queue")
|
|
close(w.taskQueue)
|
|
w.cancel()
|
|
|
|
done := make(chan struct{})
|
|
go func() { w.wg.Wait(); close(done) }()
|
|
|
|
timeout := w.cfg.ShutdownTimeout
|
|
if timeout == 0 {
|
|
timeout = 30 * time.Second
|
|
}
|
|
select {
|
|
case <-done:
|
|
w.logger.Info("worker: all workers stopped cleanly")
|
|
case <-time.After(timeout):
|
|
w.logger.Error("worker: shutdown timeout reached; some workers may still be running", nil)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *workerComponent) Len() int { return len(w.taskQueue) }
|
|
|
|
func (w *workerComponent) Dispatch(task Task) bool {
|
|
select {
|
|
case w.taskQueue <- task:
|
|
return true
|
|
default:
|
|
w.logger.Error("worker: queue full, task dropped", nil)
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (w *workerComponent) runWorker(id int) {
|
|
for task := range w.taskQueue {
|
|
ctx := w.ctx
|
|
var cancel context.CancelFunc
|
|
if w.cfg.TaskTimeout > 0 {
|
|
ctx, cancel = context.WithTimeout(ctx, w.cfg.TaskTimeout)
|
|
} else {
|
|
cancel = func() {}
|
|
}
|
|
if err := task(ctx); err != nil {
|
|
w.logger.Error("worker: task failed", err, "worker_id", id)
|
|
}
|
|
cancel()
|
|
}
|
|
}
|