296 lines
7.1 KiB
Go
296 lines
7.1 KiB
Go
|
|
package worker
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"errors"
|
||
|
|
"go/ast"
|
||
|
|
"go/parser"
|
||
|
|
"go/token"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"strings"
|
||
|
|
"sync/atomic"
|
||
|
|
"testing"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"code.nochebuena.dev/einherjar/contracts/lifecycle"
|
||
|
|
"code.nochebuena.dev/einherjar/contracts/logging"
|
||
|
|
)
|
||
|
|
|
||
|
|
// Compile-time interface checks (CT-5 / I-8).
|
||
|
|
var _ lifecycle.Component = (Component)(nil)
|
||
|
|
var _ Provider = (Component)(nil)
|
||
|
|
|
||
|
|
// --- CT-6: at most one exported TypeSpec per non-test, non-doc file ---
|
||
|
|
|
||
|
|
func TestAtMostOneExportedTypePerFile(t *testing.T) {
|
||
|
|
fset := token.NewFileSet()
|
||
|
|
pkgs, err := parser.ParseDir(fset, ".", func(fi os.FileInfo) bool {
|
||
|
|
name := fi.Name()
|
||
|
|
return !strings.HasSuffix(name, "_test.go") && name != "doc.go"
|
||
|
|
}, 0)
|
||
|
|
if err != nil {
|
||
|
|
t.Fatalf("parse: %v", err)
|
||
|
|
}
|
||
|
|
for _, pkg := range pkgs {
|
||
|
|
for path, file := range pkg.Files {
|
||
|
|
base := filepath.Base(path)
|
||
|
|
count := 0
|
||
|
|
for _, decl := range file.Decls {
|
||
|
|
gd, ok := decl.(*ast.GenDecl)
|
||
|
|
if !ok {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
for _, spec := range gd.Specs {
|
||
|
|
ts, ok := spec.(*ast.TypeSpec)
|
||
|
|
if ok && ts.Name.IsExported() {
|
||
|
|
count++
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if count > 1 {
|
||
|
|
t.Errorf("%s: %d exported TypeSpecs (max 1)", base, count)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Config defaults (S-4) ---
|
||
|
|
|
||
|
|
func TestDefaultConfig_OptionalFields(t *testing.T) {
|
||
|
|
cfg := DefaultConfig()
|
||
|
|
if cfg.PoolSize == 0 {
|
||
|
|
t.Error("PoolSize must have a default")
|
||
|
|
}
|
||
|
|
if cfg.BufferSize == 0 {
|
||
|
|
t.Error("BufferSize must have a default")
|
||
|
|
}
|
||
|
|
if cfg.ShutdownTimeout == 0 {
|
||
|
|
t.Error("ShutdownTimeout must have a default")
|
||
|
|
}
|
||
|
|
// TaskTimeout == 0 is intentional: means no per-task deadline
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- New / lifecycle ---
|
||
|
|
|
||
|
|
func TestNew_NotNil(t *testing.T) {
|
||
|
|
if New(newLogger(), Config{}) == nil {
|
||
|
|
t.Fatal("New returned nil")
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestNew_AppliesDefaults(t *testing.T) {
|
||
|
|
c := New(newLogger(), Config{PoolSize: 0, BufferSize: 0}).(*workerImpl)
|
||
|
|
if c.cfg.PoolSize != 5 {
|
||
|
|
t.Errorf("PoolSize = %d, want 5", c.cfg.PoolSize)
|
||
|
|
}
|
||
|
|
if c.cfg.BufferSize != 100 {
|
||
|
|
t.Errorf("BufferSize = %d, want 100", c.cfg.BufferSize)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- Behavioral tests ---
|
||
|
|
|
||
|
|
func TestWorker_DispatchAndExecute(t *testing.T) {
|
||
|
|
done := make(chan struct{})
|
||
|
|
c := startWorker(t, Config{PoolSize: 1, BufferSize: 10, ShutdownTimeout: time.Second})
|
||
|
|
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
close(done)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("task not executed in time")
|
||
|
|
}
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_BackpressureFull(t *testing.T) {
|
||
|
|
block := make(chan struct{})
|
||
|
|
c := startWorker(t, Config{PoolSize: 1, BufferSize: 1, ShutdownTimeout: time.Second})
|
||
|
|
|
||
|
|
c.Dispatch(func(ctx context.Context) error { <-block; return nil }) // fills worker
|
||
|
|
c.Dispatch(func(ctx context.Context) error { return nil }) // fills buffer
|
||
|
|
|
||
|
|
ok := c.Dispatch(func(ctx context.Context) error { return nil }) // should be dropped
|
||
|
|
if ok {
|
||
|
|
t.Error("expected Dispatch to return false when queue is full")
|
||
|
|
}
|
||
|
|
close(block)
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_OnStop_DrainsQueue(t *testing.T) {
|
||
|
|
var count int64
|
||
|
|
c := startWorker(t, Config{PoolSize: 2, BufferSize: 50, ShutdownTimeout: 5 * time.Second})
|
||
|
|
|
||
|
|
for i := 0; i < 10; i++ {
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
atomic.AddInt64(&count, 1)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
}
|
||
|
|
_ = c.OnStop()
|
||
|
|
|
||
|
|
if atomic.LoadInt64(&count) != 10 {
|
||
|
|
t.Errorf("expected 10 tasks completed, got %d", count)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_OnStop_Timeout(t *testing.T) {
|
||
|
|
c := startWorker(t, Config{PoolSize: 1, BufferSize: 1,
|
||
|
|
ShutdownTimeout: 50 * time.Millisecond})
|
||
|
|
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
time.Sleep(500 * time.Millisecond)
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
|
||
|
|
start := time.Now()
|
||
|
|
_ = c.OnStop()
|
||
|
|
elapsed := time.Since(start)
|
||
|
|
|
||
|
|
if elapsed > 300*time.Millisecond {
|
||
|
|
t.Errorf("OnStop blocked too long: %v", elapsed)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_TaskTimeout(t *testing.T) {
|
||
|
|
var ctxCancelled int64
|
||
|
|
c := startWorker(t, Config{
|
||
|
|
PoolSize: 1, BufferSize: 10,
|
||
|
|
TaskTimeout: 50 * time.Millisecond,
|
||
|
|
ShutdownTimeout: time.Second,
|
||
|
|
})
|
||
|
|
|
||
|
|
done := make(chan struct{})
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
defer close(done)
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
atomic.StoreInt64(&ctxCancelled, 1)
|
||
|
|
case <-time.After(500 * time.Millisecond):
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("task did not complete in time")
|
||
|
|
}
|
||
|
|
if atomic.LoadInt64(&ctxCancelled) != 1 {
|
||
|
|
t.Error("expected task context to be cancelled by TaskTimeout")
|
||
|
|
}
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_MultipleWorkers(t *testing.T) {
|
||
|
|
const n = 5
|
||
|
|
started := make(chan struct{}, n)
|
||
|
|
block := make(chan struct{})
|
||
|
|
|
||
|
|
c := startWorker(t, Config{PoolSize: n, BufferSize: n, ShutdownTimeout: time.Second})
|
||
|
|
|
||
|
|
for i := 0; i < n; i++ {
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
started <- struct{}{}
|
||
|
|
<-block
|
||
|
|
return nil
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
timer := time.NewTimer(time.Second)
|
||
|
|
defer timer.Stop()
|
||
|
|
for i := 0; i < n; i++ {
|
||
|
|
select {
|
||
|
|
case <-started:
|
||
|
|
case <-timer.C:
|
||
|
|
t.Fatalf("only %d/%d workers started concurrently", i, n)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
close(block)
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_TaskError(t *testing.T) {
|
||
|
|
c := startWorker(t, Config{PoolSize: 1, BufferSize: 10, ShutdownTimeout: time.Second})
|
||
|
|
|
||
|
|
done := make(chan struct{})
|
||
|
|
c.Dispatch(func(ctx context.Context) error {
|
||
|
|
defer close(done)
|
||
|
|
return errors.New("task error")
|
||
|
|
})
|
||
|
|
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("task did not run")
|
||
|
|
}
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_Len(t *testing.T) {
|
||
|
|
c := New(newLogger(), Config{PoolSize: 1, BufferSize: 10, ShutdownTimeout: time.Second})
|
||
|
|
if err := c.OnInit(); err != nil {
|
||
|
|
t.Fatalf("OnInit: %v", err)
|
||
|
|
}
|
||
|
|
// Workers are not running yet — queued tasks stay in the channel.
|
||
|
|
blocked := make(chan struct{})
|
||
|
|
c.Dispatch(func(ctx context.Context) error { <-blocked; return nil })
|
||
|
|
c.Dispatch(func(ctx context.Context) error { <-blocked; return nil })
|
||
|
|
if got := c.Len(); got != 2 {
|
||
|
|
t.Errorf("Len() = %d, want 2", got)
|
||
|
|
}
|
||
|
|
close(blocked)
|
||
|
|
_ = c.OnStop()
|
||
|
|
}
|
||
|
|
|
||
|
|
func TestWorker_Lifecycle(t *testing.T) {
|
||
|
|
c := New(newLogger(), Config{PoolSize: 2, BufferSize: 10, ShutdownTimeout: time.Second})
|
||
|
|
if err := c.OnInit(); err != nil {
|
||
|
|
t.Fatalf("OnInit: %v", err)
|
||
|
|
}
|
||
|
|
if err := c.OnStart(); err != nil {
|
||
|
|
t.Fatalf("OnStart: %v", err)
|
||
|
|
}
|
||
|
|
done := make(chan struct{})
|
||
|
|
c.Dispatch(func(ctx context.Context) error { close(done); return nil })
|
||
|
|
select {
|
||
|
|
case <-done:
|
||
|
|
case <-time.After(time.Second):
|
||
|
|
t.Fatal("task not executed")
|
||
|
|
}
|
||
|
|
if err := c.OnStop(); err != nil {
|
||
|
|
t.Fatalf("OnStop: %v", err)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// --- helpers ---
|
||
|
|
|
||
|
|
type stubLogger struct{}
|
||
|
|
|
||
|
|
func newLogger() *stubLogger { return &stubLogger{} }
|
||
|
|
|
||
|
|
func (s *stubLogger) Debug(msg string, args ...any) {}
|
||
|
|
func (s *stubLogger) Info(msg string, args ...any) {}
|
||
|
|
func (s *stubLogger) Warn(msg string, args ...any) {}
|
||
|
|
func (s *stubLogger) Error(msg string, err error, args ...any) {}
|
||
|
|
func (s *stubLogger) With(args ...any) logging.Logger { return s }
|
||
|
|
func (s *stubLogger) WithContext(ctx context.Context) logging.Logger { return s }
|
||
|
|
|
||
|
|
func startWorker(t *testing.T, cfg Config) Component {
|
||
|
|
t.Helper()
|
||
|
|
c := New(newLogger(), cfg)
|
||
|
|
if err := c.OnInit(); err != nil {
|
||
|
|
t.Fatalf("OnInit: %v", err)
|
||
|
|
}
|
||
|
|
if err := c.OnStart(); err != nil {
|
||
|
|
t.Fatalf("OnStart: %v", err)
|
||
|
|
}
|
||
|
|
return c
|
||
|
|
}
|