package worker import ( "context" "sync" "time" "code.nochebuena.dev/einherjar/contracts/logging" "code.nochebuena.dev/einherjar/contracts/observability" ) // Compile-time interface verification (I-8 / CT-5). var _ Component = (*workerImpl)(nil) var _ observability.Identifiable = (*workerImpl)(nil) // New returns a Component backed by the given config. // The goroutine pool is not started until OnStart is called. func New(logger logging.Logger, cfg Config) Component { if cfg.PoolSize <= 0 { cfg.PoolSize = 5 } if cfg.BufferSize <= 0 { cfg.BufferSize = 100 } ctx, cancel := context.WithCancel(context.Background()) return &workerImpl{ logger: logger, cfg: cfg, taskQueue: make(chan Task, cfg.BufferSize), ctx: ctx, cancel: cancel, } } type workerImpl struct { logger logging.Logger cfg Config taskQueue chan Task wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } func (w *workerImpl) OnInit() error { w.logger.Info("worker: initializing pool", "pool_size", w.cfg.PoolSize, "buffer_size", w.cfg.BufferSize) return nil } func (w *workerImpl) OnStart() error { w.logger.Info("worker: starting workers") for i := 0; i < w.cfg.PoolSize; i++ { w.wg.Add(1) go func(id int) { defer w.wg.Done() w.runWorker(id) }(i) } return nil } func (w *workerImpl) OnStop() error { w.logger.Info("worker: stopping, draining queue") close(w.taskQueue) w.cancel() done := make(chan struct{}) go func() { w.wg.Wait(); close(done) }() timeout := w.cfg.ShutdownTimeout if timeout == 0 { timeout = 30 * time.Second } select { case <-done: w.logger.Info("worker: all workers stopped cleanly") case <-time.After(timeout): w.logger.Error("worker: shutdown timeout reached; some workers may still be running", nil) } return nil } func (w *workerImpl) Len() int { return len(w.taskQueue) } func (w *workerImpl) Dispatch(task Task) bool { select { case w.taskQueue <- task: return true default: w.logger.Error("worker: queue full, task dropped", nil) return false } } func (w *workerImpl) runWorker(id int) { for task := range w.taskQueue { ctx := w.ctx var cancel context.CancelFunc if w.cfg.TaskTimeout > 0 { ctx, cancel = context.WithTimeout(ctx, w.cfg.TaskTimeout) } else { cancel = func() {} } if err := task(ctx); err != nil { w.logger.Error("worker: task failed", err, "worker_id", id) } cancel() } }