111 lines
2.4 KiB
Go
111 lines
2.4 KiB
Go
|
|
package worker
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"code.nochebuena.dev/einherjar/contracts/logging"
|
||
|
|
"code.nochebuena.dev/einherjar/contracts/observability"
|
||
|
|
)
|
||
|
|
|
||
|
|
// Compile-time interface verification (I-8 / CT-5).
|
||
|
|
var _ Component = (*workerImpl)(nil)
|
||
|
|
var _ observability.Identifiable = (*workerImpl)(nil)
|
||
|
|
|
||
|
|
// New returns a Component backed by the given config.
|
||
|
|
// The goroutine pool is not started until OnStart is called.
|
||
|
|
func New(logger logging.Logger, cfg Config) Component {
|
||
|
|
if cfg.PoolSize <= 0 {
|
||
|
|
cfg.PoolSize = 5
|
||
|
|
}
|
||
|
|
if cfg.BufferSize <= 0 {
|
||
|
|
cfg.BufferSize = 100
|
||
|
|
}
|
||
|
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
|
return &workerImpl{
|
||
|
|
logger: logger,
|
||
|
|
cfg: cfg,
|
||
|
|
taskQueue: make(chan Task, cfg.BufferSize),
|
||
|
|
ctx: ctx,
|
||
|
|
cancel: cancel,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
type workerImpl struct {
|
||
|
|
logger logging.Logger
|
||
|
|
cfg Config
|
||
|
|
taskQueue chan Task
|
||
|
|
wg sync.WaitGroup
|
||
|
|
ctx context.Context
|
||
|
|
cancel context.CancelFunc
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *workerImpl) OnInit() error {
|
||
|
|
w.logger.Info("worker: initializing pool",
|
||
|
|
"pool_size", w.cfg.PoolSize,
|
||
|
|
"buffer_size", w.cfg.BufferSize)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *workerImpl) OnStart() error {
|
||
|
|
w.logger.Info("worker: starting workers")
|
||
|
|
for i := 0; i < w.cfg.PoolSize; i++ {
|
||
|
|
w.wg.Add(1)
|
||
|
|
go func(id int) {
|
||
|
|
defer w.wg.Done()
|
||
|
|
w.runWorker(id)
|
||
|
|
}(i)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *workerImpl) OnStop() error {
|
||
|
|
w.logger.Info("worker: stopping, draining queue")
|
||
|
|
close(w.taskQueue)
|
||
|
|
w.cancel()
|
||
|
|
|
||
|
|
done := make(chan struct{})
|
||
|
|
go func() { w.wg.Wait(); close(done) }()
|
||
|
|
|
||
|
|
timeout := w.cfg.ShutdownTimeout
|
||
|
|
if timeout == 0 {
|
||
|
|
timeout = 30 * time.Second
|
||
|
|
}
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
w.logger.Info("worker: all workers stopped cleanly")
|
||
|
|
case <-time.After(timeout):
|
||
|
|
w.logger.Error("worker: shutdown timeout reached; some workers may still be running", nil)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *workerImpl) Len() int { return len(w.taskQueue) }
|
||
|
|
|
||
|
|
func (w *workerImpl) Dispatch(task Task) bool {
|
||
|
|
select {
|
||
|
|
case w.taskQueue <- task:
|
||
|
|
return true
|
||
|
|
default:
|
||
|
|
w.logger.Error("worker: queue full, task dropped", nil)
|
||
|
|
return false
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (w *workerImpl) runWorker(id int) {
|
||
|
|
for task := range w.taskQueue {
|
||
|
|
ctx := w.ctx
|
||
|
|
var cancel context.CancelFunc
|
||
|
|
if w.cfg.TaskTimeout > 0 {
|
||
|
|
ctx, cancel = context.WithTimeout(ctx, w.cfg.TaskTimeout)
|
||
|
|
} else {
|
||
|
|
cancel = func() {}
|
||
|
|
}
|
||
|
|
if err := task(ctx); err != nil {
|
||
|
|
w.logger.Error("worker: task failed", err, "worker_id", id)
|
||
|
|
}
|
||
|
|
cancel()
|
||
|
|
}
|
||
|
|
}
|