feat(db-postgres): initial implementation — pgx pool with lifecycle and UnitOfWork (v1.0.0)
Introduces code.nochebuena.dev/einherjar/db-postgres — the PostgreSQL database starter for the Einherjar framework. Absorbs the postgres package from micro-lib, replacing fmt.Errorf wrapping with core/xerrors and migrating from pgx v4 to pgx v5. Interfaces (CT-6: one TypeSpec per file): - Executor — Exec, Query, QueryRow (pgx-native types) - Tx — Executor + Commit(ctx), Rollback(ctx) - Provider — GetExecutor, Begin, BeginTx, Ping, HandleError - Component — lifecycle.Component + observability.Checkable + Provider + Stats() - UnitOfWork — Do(ctx, fn) Implementation: - New(logger, cfg) Component — pool not created until OnInit - OnInit: pgxpool.NewWithConfig with duration parsing; 30s timeout - OnStart: PING with 5s timeout; logs "postgres: connected" - OnStop: pool.Close(); logs "postgres: closing pool" - GetExecutor: returns active Tx from context (ctxTxKey) or pool; nil-safe - Begin/BeginTx: wraps pgx.TxOptions; wrapped in xerrors on error - HealthCheck: delegates to Ping; Priority LevelCritical - Stats() *pgxpool.Stat — zero value when pool uninitialized - NewUnitOfWork(logger, provider) UnitOfWork — Begin+inject+commit/rollback - HandleError: UniqueViolation→ErrAlreadyExists, ForeignKey/Check→ErrInvalidInput, pgx.ErrNoRows→ErrNotFound, all others→ErrInternal Config (EINHERJAR_PG_* env vars): Host, Port(5432), User, Password, Name, SSLMode(disable), Timezone(UTC), MaxConns(5), MinConns(2), MaxConnLifetime(1h), MaxConnIdleTime(30m), HealthCheckPeriod(1m) - Component interface embeds observability.Identifiable; identifiable.go implements ModulePath and ModuleVersion via runtime/debug.ReadBuildInfo() — prints in launcher banner
This commit is contained in:
231
new.go
Normal file
231
new.go
Normal file
@@ -0,0 +1,231 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"code.nochebuena.dev/einherjar/contracts/logging"
|
||||
"code.nochebuena.dev/einherjar/contracts/observability"
|
||||
"code.nochebuena.dev/einherjar/core/xerrors"
|
||||
)
|
||||
|
||||
// Compile-time interface verification (I-8 / CT-5).
|
||||
var _ Component = (*pgComponent)(nil)
|
||||
var _ observability.Identifiable = (*pgComponent)(nil)
|
||||
var _ Tx = (*pgTx)(nil)
|
||||
var _ UnitOfWork = (*unitOfWork)(nil)
|
||||
|
||||
// New returns a Component backed by the given configuration.
|
||||
// The pool is not created until OnInit is called.
|
||||
func New(logger logging.Logger, cfg Config) Component {
|
||||
return &pgComponent{logger: logger, cfg: cfg}
|
||||
}
|
||||
|
||||
// NewUnitOfWork returns a UnitOfWork backed by the given client.
|
||||
func NewUnitOfWork(logger logging.Logger, client Provider) UnitOfWork {
|
||||
return &unitOfWork{logger: logger, client: client}
|
||||
}
|
||||
|
||||
// ctxTxKey is the context key for the active transaction.
|
||||
type ctxTxKey struct{}
|
||||
|
||||
// --- pgComponent ---
|
||||
|
||||
type pgComponent struct {
|
||||
logger logging.Logger
|
||||
cfg Config
|
||||
pool *pgxpool.Pool
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *pgComponent) OnInit() error {
|
||||
poolCfg, err := c.buildPoolConfig()
|
||||
if err != nil {
|
||||
return xerrors.New(xerrors.ErrInvalidInput, "postgres: parse config").WithError(err)
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.NewWithConfig(ctx, poolCfg)
|
||||
if err != nil {
|
||||
c.logger.Error("postgres: failed to create pool", err)
|
||||
return xerrors.New(xerrors.ErrInternal, "postgres: create pool").WithError(err)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.pool = pool
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *pgComponent) OnStart() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if err := c.Ping(ctx); err != nil {
|
||||
return xerrors.New(xerrors.ErrUnavailable, "postgres: ping failed").WithError(err)
|
||||
}
|
||||
c.logger.Info("postgres: connected")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *pgComponent) OnStop() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.pool != nil {
|
||||
c.logger.Info("postgres: closing pool")
|
||||
c.pool.Close()
|
||||
c.pool = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *pgComponent) Ping(ctx context.Context) error {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
if pool == nil {
|
||||
return xerrors.New(xerrors.ErrInternal, "postgres: pool not initialized")
|
||||
}
|
||||
return pool.Ping(ctx)
|
||||
}
|
||||
|
||||
func (c *pgComponent) GetExecutor(ctx context.Context) Executor {
|
||||
if tx, ok := ctx.Value(ctxTxKey{}).(Executor); ok {
|
||||
return tx
|
||||
}
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
return pool
|
||||
}
|
||||
|
||||
func (c *pgComponent) BeginTx(ctx context.Context, opts pgx.TxOptions) (Tx, error) {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
if pool == nil {
|
||||
return nil, xerrors.New(xerrors.ErrInternal, "postgres: pool not initialized")
|
||||
}
|
||||
tx, err := pool.BeginTx(ctx, opts)
|
||||
if err != nil {
|
||||
return nil, xerrors.New(xerrors.ErrInternal, "postgres: begin transaction").WithError(err)
|
||||
}
|
||||
return &pgTx{Tx: tx}, nil
|
||||
}
|
||||
|
||||
func (c *pgComponent) Begin(ctx context.Context) (Tx, error) {
|
||||
return c.BeginTx(ctx, pgx.TxOptions{})
|
||||
}
|
||||
|
||||
func (c *pgComponent) Stats() *pgxpool.Stat {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
if pool == nil {
|
||||
return &pgxpool.Stat{}
|
||||
}
|
||||
return pool.Stat()
|
||||
}
|
||||
|
||||
func (c *pgComponent) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
return pool.Exec(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (c *pgComponent) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
return pool.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (c *pgComponent) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
|
||||
c.mu.RLock()
|
||||
pool := c.pool
|
||||
c.mu.RUnlock()
|
||||
return pool.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (c *pgComponent) HandleError(err error) error { return HandleError(err) }
|
||||
|
||||
func (c *pgComponent) HealthCheck(ctx context.Context) error { return c.Ping(ctx) }
|
||||
func (c *pgComponent) Name() string { return "postgres" }
|
||||
func (c *pgComponent) Priority() observability.Level { return observability.LevelCritical }
|
||||
|
||||
func (c *pgComponent) buildPoolConfig() (*pgxpool.Config, error) {
|
||||
cfg, err := pgxpool.ParseConfig(c.cfg.DSN())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cfg.MaxConns = int32(c.cfg.MaxConns)
|
||||
cfg.MinConns = int32(c.cfg.MinConns)
|
||||
if c.cfg.MaxConnLifetime != "" {
|
||||
d, err := time.ParseDuration(c.cfg.MaxConnLifetime)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PG_MAX_CONN_LIFETIME: %w", err)
|
||||
}
|
||||
cfg.MaxConnLifetime = d
|
||||
}
|
||||
if c.cfg.MaxConnIdleTime != "" {
|
||||
d, err := time.ParseDuration(c.cfg.MaxConnIdleTime)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PG_MAX_CONN_IDLE_TIME: %w", err)
|
||||
}
|
||||
cfg.MaxConnIdleTime = d
|
||||
}
|
||||
if c.cfg.HealthCheckPeriod != "" {
|
||||
d, err := time.ParseDuration(c.cfg.HealthCheckPeriod)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("PG_HEALTH_CHECK_PERIOD: %w", err)
|
||||
}
|
||||
cfg.HealthCheckPeriod = d
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// --- pgTx ---
|
||||
|
||||
type pgTx struct{ pgx.Tx }
|
||||
|
||||
func (t *pgTx) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) {
|
||||
return t.Tx.Exec(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (t *pgTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
|
||||
return t.Tx.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (t *pgTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
|
||||
return t.Tx.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (t *pgTx) Commit(ctx context.Context) error { return t.Tx.Commit(ctx) }
|
||||
func (t *pgTx) Rollback(ctx context.Context) error { return t.Tx.Rollback(ctx) }
|
||||
|
||||
// --- unitOfWork ---
|
||||
|
||||
type unitOfWork struct {
|
||||
logger logging.Logger
|
||||
client Provider
|
||||
}
|
||||
|
||||
func (u *unitOfWork) Do(ctx context.Context, fn func(ctx context.Context) error) error {
|
||||
tx, err := u.client.Begin(ctx)
|
||||
if err != nil {
|
||||
return xerrors.New(xerrors.ErrInternal, "postgres: begin transaction").WithError(err)
|
||||
}
|
||||
ctx = context.WithValue(ctx, ctxTxKey{}, tx)
|
||||
if err := fn(ctx); err != nil {
|
||||
if rbErr := tx.Rollback(ctx); rbErr != nil {
|
||||
u.logger.Error("postgres: rollback failed", rbErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
Reference in New Issue
Block a user