commit 2baafa6a0c3356aad67a4bcf8bd293570f51389f Author: Rene Nochebuena Date: Thu Mar 19 13:18:07 2026 +0000 feat(postgres): initial stable release v0.9.0 pgx v5-native PostgreSQL client with launcher lifecycle, health check, unit-of-work via context injection, and structured error mapping. What's included: - Executor / Tx / Client / Component interfaces using pgx native types (pgconn.CommandTag, pgx.Rows, pgx.Row) - New(logger, cfg) constructor; pgxpool initialised in OnInit - Config struct with env-tag support for all pool tuning parameters - UnitOfWork via context injection; GetExecutor(ctx) returns active Tx or pool - HandleError mapping pgerrcode constants to xerrors codes (AlreadyExists, InvalidInput, NotFound, Internal) - health.Checkable at LevelCritical; HealthCheck delegates to pgxpool.Ping Tested-via: todo-api POC integration Reviewed-against: docs/adr/ diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..54f5aae --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,26 @@ +{ + "name": "Go", + "image": "mcr.microsoft.com/devcontainers/go:2-1.25-trixie", + "features": { + "ghcr.io/devcontainers-extra/features/claude-code:1": {} + }, + "forwardPorts": [], + "postCreateCommand": "go version", + "customizations": { + "vscode": { + "settings": { + "files.autoSave": "afterDelay", + "files.autoSaveDelay": 1000, + "explorer.compactFolders": false, + "explorer.showEmptyFolders": true + }, + "extensions": [ + "golang.go", + "eamodio.golang-postfix-completion", + "quicktype.quicktype", + "usernamehw.errorlens" + ] + } + }, + "remoteUser": "vscode" +} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..221da82 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Binaries +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with go test -c +*.test + +# Output of go build +*.out + +# Dependency directory +vendor/ + +# Go workspace file +go.work +go.work.sum + +# Environment files +.env +.env.* + +# Editor / IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# VCS files +COMMIT.md +RELEASE.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d692476 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,33 @@ +# Changelog + +All notable changes to this module will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this module adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.9.0] - 2026-03-18 + +### Added + +- `Executor` interface: `Exec`, `Query`, `QueryRow` using native pgx types (`pgconn.CommandTag`, `pgx.Rows`, `pgx.Row`). +- `Tx` interface: embeds `Executor` and adds `Commit(ctx context.Context) error` and `Rollback(ctx context.Context) error`. +- `Client` interface: `GetExecutor(ctx context.Context) Executor`, `Begin(ctx context.Context) (Tx, error)`, `Ping(ctx context.Context) error`, `HandleError(err error) error`. +- `Component` interface: composes `launcher.Component`, `health.Checkable`, and `Client` into a single injectable dependency. +- `UnitOfWork` interface: `Do(ctx context.Context, fn func(ctx context.Context) error) error`. +- `Config` struct: fields `Host`, `Port`, `User`, `Password`, `Name`, `SSLMode`, `Timezone`, `MaxConns`, `MinConns`, `MaxConnLifetime`, `MaxConnIdleTime`, `HealthCheckPeriod`; all settable via environment variables with `env` struct tags and sensible defaults. +- `Config.DSN() string`: constructs a `postgres://` URL including SSL mode and timezone query parameters. +- `New(logger logz.Logger, cfg Config) Component`: returns a `pgxpool`-backed component; pool is created lazily in `OnInit`. +- Lifecycle hooks: `OnInit` parses config and creates the connection pool with a 30-second timeout; `OnStart` pings the database with a 5-second timeout; `OnStop` closes the pool gracefully. +- `health.Checkable` implementation: `HealthCheck` delegates to `Ping`; `Name()` returns `"postgres"`; `Priority()` returns `health.LevelCritical`. +- `NewUnitOfWork(logger logz.Logger, client Client) UnitOfWork`: wraps a `Client` to provide transactional `Do` semantics; rolls back and logs on error, commits on success. +- `HandleError(err error) error` (package-level function): maps `*pgconn.PgError` codes to xerrors — `UniqueViolation` → `ErrAlreadyExists`, `ForeignKeyViolation` → `ErrInvalidInput`, `CheckViolation` → `ErrInvalidInput`; `pgx.ErrNoRows` → `ErrNotFound`; all other errors → `ErrInternal`. +- Transaction context injection: the active `pgx.Tx` is stored under an unexported `ctxTxKey{}` context key; `GetExecutor` returns the transaction when found, otherwise returns the pool. +- All pool reads guarded by `sync.RWMutex` for safe concurrent access. + +### Design Notes + +- All interfaces use pgx-native types (`pgconn.CommandTag`, `pgx.Rows`, `pgx.Row`) directly; there is no `database/sql` adapter. This is intentional and incompatible with the `mysql` module by design. +- `UnitOfWork.Do` injects the transaction into the context so repositories can call `GetExecutor(ctx)` transparently without knowing whether a transaction is active. +- PostgreSQL error codes are matched via `pgerrcode` constants and `errors.As`, never by parsing error message strings. + +[0.9.0]: https://code.nochebuena.dev/go/postgres/releases/tag/v0.9.0 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..40ec833 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,70 @@ +# postgres + +pgx-backed PostgreSQL client with launcher lifecycle, health check integration, and unit-of-work transaction management. + +## Purpose + +Provides a `Component` that manages a `pgxpool` connection pool, satisfies the `launcher.Component` lifecycle hooks (`OnInit`, `OnStart`, `OnStop`), and implements `health.Checkable` (priority: critical). Also provides `NewUnitOfWork` for wrapping multiple repository operations in a single transaction via context injection. + +## Tier & Dependencies + +**Tier 3** (infrastructure) — depends on: +- `code.nochebuena.dev/go/health` (Tier 1) +- `code.nochebuena.dev/go/launcher` (Tier 1) +- `code.nochebuena.dev/go/logz` (Tier 0) +- `code.nochebuena.dev/go/xerrors` (Tier 0) +- `github.com/jackc/pgx/v5` and `pgxpool`, `pgconn` sub-packages +- `github.com/jackc/pgerrcode` + +## Key Design Decisions + +- **pgx native types** (ADR-001): `Executor` uses `pgconn.CommandTag`, `pgx.Rows`, and `pgx.Row`. There is no `database/sql` adapter. Repository code imports pgx types directly. +- **Local Executor interface** (ADR-002): `dbutil` was eliminated. `Executor`, `Tx`, `Client`, and `Component` are all defined in this package using pgx types. The `mysql` module has its own independent `Executor`. +- **UnitOfWork via context injection** (ADR-003): The active `pgx.Tx` is stored in the context under `ctxTxKey{}` (unexported). `GetExecutor(ctx)` returns the transaction if present, otherwise the pool. `Tx.Commit` and `Tx.Rollback` both accept `ctx`. +- **Error mapping via pgerrcode constants**: `HandleError` uses `pgerrcode.UniqueViolation`, `pgerrcode.ForeignKeyViolation`, and `pgerrcode.CheckViolation` (not string-matched error messages) to map `*pgconn.PgError` to xerrors codes. +- **`health.Checkable` embedded in Component**: `pgComponent.HealthCheck` delegates to `Ping`. Priority is `health.LevelCritical` — a PostgreSQL outage brings down the service. + +## Patterns + +Lifecycle registration: + +```go +db := postgres.New(logger, cfg) +lc.Append(db) +r.Get("/health", health.NewHandler(logger, db)) +``` + +Unit of Work across multiple repositories: + +```go +uow := postgres.NewUnitOfWork(logger, db) +err := uow.Do(ctx, func(ctx context.Context) error { + exec := db.GetExecutor(ctx) // returns active Tx + _, err := exec.Exec(ctx, "INSERT INTO ...") + return err +}) +``` + +Error handling in repository code: + +```go +row := db.GetExecutor(ctx).QueryRow(ctx, "SELECT ...") +if err := row.Scan(&out); err != nil { + return db.HandleError(err) // maps pgx.ErrNoRows → ErrNotFound, etc. +} +``` + +## What to Avoid + +- Do not use `database/sql` types (`sql.Rows`, `sql.Result`, etc.) alongside this module. The module is pgx-native; mixing the two type systems requires explicit adaptation. +- Do not store `Executor` references across goroutine boundaries during a `UnitOfWork.Do`. The transaction is tied to the context passed into `Do`; goroutines that outlive `Do` will use a committed or rolled-back transaction. +- Do not match PostgreSQL error codes by parsing error message strings. Use `pgerrcode` constants and `errors.As(err, &pgErr)` as `HandleError` does. +- Do not add `sync.Once` or package-level pool variables. `pgComponent` is the unit of construction; create one per database and pass it as a dependency. +- Do not call `db.GetExecutor(ctx)` from outside the `UnitOfWork.Do` callback if you need transactional semantics — it will return the pool. + +## Testing Notes + +- `postgres_test.go` tests compile-time interface satisfaction. Integration tests (pool, real queries) require a live PostgreSQL instance and are typically run in CI with a service container. +- `compliance_test.go` asserts at compile time that `postgres.New(...)` satisfies `postgres.Component`. +- `HandleError` can be unit-tested by constructing `*pgconn.PgError` with known `Code` values — no database connection needed. +- Pool initialization happens in `OnInit`, not `New`. Tests that mock the `Client` interface bypass pool setup entirely. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0b33b48 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 NOCHEBUENADEV + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..39ce95f --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +# postgres + +pgx-backed PostgreSQL client with launcher lifecycle and health check integration. + +## Install + +``` +go get code.nochebuena.dev/go/postgres +``` + +## Usage + +```go +db := postgres.New(logger, cfg) +lc.Append(db) +r.Get("/health", health.NewHandler(logger, db)) +``` + +## Querying + +```go +exec := db.GetExecutor(ctx) // returns pool, or active Tx if inside Do() + +rows, err := exec.Query(ctx, "SELECT id, name FROM users WHERE active = $1", true) +defer rows.Close() +``` + +## Unit of Work + +```go +uow := postgres.NewUnitOfWork(logger, db) + +err := uow.Do(ctx, func(ctx context.Context) error { + exec := db.GetExecutor(ctx) // returns the transaction + _, err := exec.Exec(ctx, "INSERT INTO orders ...") + return err +}) +``` + +## Error mapping + +```go +if err := db.HandleError(err); err != nil { ... } +// or package-level: postgres.HandleError(err) +``` + +| PostgreSQL error | xerrors code | +|---|---| +| `unique_violation` | `ErrAlreadyExists` | +| `foreign_key_violation` | `ErrInvalidInput` | +| `check_violation` | `ErrInvalidInput` | +| `pgx.ErrNoRows` | `ErrNotFound` | +| anything else | `ErrInternal` | + +## Configuration + +| Env var | Default | Description | +|---|---|---| +| `PG_HOST` | required | Database host | +| `PG_PORT` | `5432` | Database port | +| `PG_USER` | required | Username | +| `PG_PASSWORD` | required | Password | +| `PG_NAME` | required | Database name | +| `PG_SSL_MODE` | `disable` | SSL mode | +| `PG_MAX_CONNS` | `5` | Max pool connections | +| `PG_MIN_CONNS` | `2` | Min pool connections | diff --git a/compliance_test.go b/compliance_test.go new file mode 100644 index 0000000..254aea6 --- /dev/null +++ b/compliance_test.go @@ -0,0 +1,9 @@ +package postgres_test + +import ( + "code.nochebuena.dev/go/logz" + "code.nochebuena.dev/go/postgres" +) + +// Compile-time check: New returns a valid Component. +var _ postgres.Component = postgres.New(logz.New(logz.Options{}), postgres.Config{}) diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..2354956 --- /dev/null +++ b/doc.go @@ -0,0 +1,20 @@ +// Package postgres provides a pgx-backed PostgreSQL client with launcher and health integration. +// +// Usage: +// +// db := postgres.New(logger, cfg) +// lc.Append(db) +// r.Get("/health", health.NewHandler(logger, db)) +// +// The component manages a pgxpool connection pool, implements launcher lifecycle hooks, +// and satisfies health.Checkable (priority: critical). +// +// Use [NewUnitOfWork] to wrap operations in a transaction: +// +// uow := postgres.NewUnitOfWork(logger, db) +// uow.Do(ctx, func(ctx context.Context) error { +// exec := db.GetExecutor(ctx) // returns the active transaction +// _, err := exec.Exec(ctx, "INSERT ...") +// return err +// }) +package postgres diff --git a/docs/adr/ADR-001-pgx-native-types.md b/docs/adr/ADR-001-pgx-native-types.md new file mode 100644 index 0000000..30c4d5d --- /dev/null +++ b/docs/adr/ADR-001-pgx-native-types.md @@ -0,0 +1,34 @@ +# ADR-001: pgx Native Types + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +Go's standard `database/sql` package provides a database-agnostic interface. Using it with PostgreSQL requires a `database/sql`-compatible driver and means working with `sql.Result`, `*sql.Rows`, and `*sql.Row` — types that were designed for the lowest common denominator across all SQL databases. + +`github.com/jackc/pgx/v5` is a PostgreSQL-specific driver and toolkit that exposes its own richer type system: `pgx.Rows`, `pgx.Row`, and `pgconn.CommandTag`. It provides better performance, native support for PostgreSQL-specific types (arrays, hstore, composite types, etc.), and a more accurate representation of PostgreSQL semantics (e.g., `CommandTag` carries `RowsAffected` as well as the SQL command string). + +The tradeoff is that choosing pgx means explicitly not supporting other databases through the same client type. + +## Decision + +The `postgres` module uses pgx native types throughout its public API. The `Executor` interface uses: + +```go +Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) +Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) +QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +``` + +The connection pool is `*pgxpool.Pool` (from `pgx/v5/pgxpool`). The transaction type wraps `pgx.Tx`. There is no `database/sql` adapter layer. + +Repository code in application layers imports `pgx` types directly when scanning rows or reading `CommandTag`. This is an explicit, honest API: it says "this is PostgreSQL via pgx" rather than pretending to be database-agnostic. + +## Consequences + +- **Positive**: Full access to PostgreSQL-specific capabilities (binary encoding, COPY protocol, listen/notify, array types, etc.) without impedance mismatch. +- **Positive**: `pgconn.CommandTag` carries richer information than `sql.Result` (includes the command string, not just rows affected). +- **Positive**: `pgx.Rows` and `pgx.Row` support pgx scan helpers and named arguments. +- **Negative**: Repository code cannot be trivially swapped to use the `mysql` module or any other `database/sql` driver — it imports pgx types. This is acceptable because the tier system isolates database clients at Tier 3; application logic in higher tiers operates through domain interfaces, not directly on `Executor`. +- **Negative**: `pgx.Rows` must be closed after iteration (`defer rows.Close()`). Forgetting this leaks connections. This is the same discipline as `database/sql` but worth noting. diff --git a/docs/adr/ADR-002-local-executor-interface.md b/docs/adr/ADR-002-local-executor-interface.md new file mode 100644 index 0000000..b0fbf74 --- /dev/null +++ b/docs/adr/ADR-002-local-executor-interface.md @@ -0,0 +1,36 @@ +# ADR-002: Local Executor Interface + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +The `Executor` interface — the common query interface shared by the connection pool and an active transaction — must be defined somewhere. Earlier iterations of this codebase placed it in a shared `dbutil` package that both `postgres` and `mysql` imported. This created a cross-cutting dependency: every database module depended on `dbutil`, and `dbutil` had to make choices (e.g., which type system to use) that were appropriate for only one of them. + +`dbutil` was eliminated as part of the monorepo refactor (see `plan/decisions.md`). + +## Decision + +The `Executor` interface is defined locally inside the `postgres` package: + +```go +type Executor interface { + Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} +``` + +The `mysql` package defines its own separate `Executor` using `database/sql` types. The two are not interchangeable by design — they represent different type systems. + +`Tx` extends `Executor` with `Commit(ctx context.Context) error` and `Rollback(ctx context.Context) error`. `Client` provides `GetExecutor`, `Begin`, `Ping`, and `HandleError`. `Component` composes `Client`, `launcher.Component`, and `health.Checkable`. + +Repository code in application layers should depend on `postgres.Executor` (or the higher-level `postgres.Client`) — not on the concrete `*pgxpool.Pool` or `pgTx` types. + +## Consequences + +- **Positive**: No shared `dbutil` dependency. Each database module owns its interface and can evolve it independently. +- **Positive**: The interface methods use pgx-native types, so there is no impedance mismatch between the interface and the implementation. +- **Positive**: Mocking `postgres.Executor` in tests requires only implementing three methods with pgx return types — no wrapper types needed. +- **Negative**: If a project uses both `postgres` and `mysql`, neither module's `Executor` is compatible with the other. Cross-database abstractions must be built at the application domain interface layer, not by sharing a common `Executor`. +- **Note**: `pgComponent` itself also implements `Executor` directly (forwarding to the pool), which means a `*pgComponent` can be used wherever an `Executor` is expected without calling `GetExecutor`. This is intentional for ergonomics in simple cases where no transaction management is needed. diff --git a/docs/adr/ADR-003-uow-context-injection.md b/docs/adr/ADR-003-uow-context-injection.md new file mode 100644 index 0000000..ac76a3d --- /dev/null +++ b/docs/adr/ADR-003-uow-context-injection.md @@ -0,0 +1,48 @@ +# ADR-003: Unit of Work via Context Injection + +**Status:** Accepted +**Date:** 2026-03-18 + +## Context + +Database transactions must span multiple repository calls without requiring each repository method to accept a `Tx` parameter explicitly. Passing `Tx` as a parameter would leak transaction concepts into repository method signatures and force every call site to decide whether it is inside a transaction. + +An alternative is ambient transaction state stored in a thread-local or goroutine-local variable, but Go has no such construct, and package-level state would break concurrent use. + +## Decision + +The active transaction is stored in the request `context.Context` under an unexported key type `ctxTxKey{}`: + +```go +type ctxTxKey struct{} +``` + +`UnitOfWork.Do` begins a transaction, injects it into the context, and calls the user-supplied function with the enriched context: + +```go +ctx = context.WithValue(ctx, ctxTxKey{}, tx) +fn(ctx) +``` + +`Client.GetExecutor(ctx)` checks the context for an active transaction first: + +```go +if tx, ok := ctx.Value(ctxTxKey{}).(Executor); ok { + return tx +} +// fall back to pool +``` + +If there is no active transaction, `GetExecutor` returns the pool. This means repository code uses `db.GetExecutor(ctx)` uniformly and is agnostic about whether it is inside a transaction. + +`Tx.Commit(ctx)` and `Tx.Rollback(ctx)` both accept `ctx` — this is supported by `pgx.Tx` and matches the overall pgx API convention. + +On function error, `UnitOfWork.Do` calls `Rollback` and returns the original error. Rollback failures are logged but do not replace the original error. + +## Consequences + +- **Positive**: Repository methods need only `ctx context.Context` and `db postgres.Client`; they do not need a separate `Tx` parameter. +- **Positive**: Nesting `UnitOfWork.Do` calls is safe — the inner call will pick up the already-injected transaction from the context, so a single transaction spans all nested calls. (pgx savepoints are not used; the outer transaction is reused.) +- **Positive**: The unexported `ctxTxKey{}` type prevents collisions with other packages that store values in the context. +- **Negative**: The transaction is invisible from a type-system perspective — there is no way to statically verify that a function is called inside a `UnitOfWork.Do`. Violations are runtime errors, not compile-time errors. +- **Negative**: Passing a context that carries a transaction to a goroutine that outlives the `UnitOfWork.Do` call would use a closed transaction. Callers must not spawn goroutines from inside the `Do` function that outlive `Do`. diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..a487ca3 --- /dev/null +++ b/errors.go @@ -0,0 +1,38 @@ +package postgres + +import ( + "errors" + + "github.com/jackc/pgerrcode" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "code.nochebuena.dev/go/xerrors" +) + +// HandleError maps pgx and PostgreSQL errors to xerrors types. +// Also available as client.HandleError(err). +func HandleError(err error) error { + if err == nil { + return nil + } + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + switch pgErr.Code { + case pgerrcode.UniqueViolation: + return xerrors.New(xerrors.ErrAlreadyExists, "record already exists"). + WithContext("constraint", pgErr.ConstraintName).WithError(err) + case pgerrcode.ForeignKeyViolation: + return xerrors.New(xerrors.ErrInvalidInput, "data integrity violation"). + WithContext("table", pgErr.TableName).WithError(err) + case pgerrcode.CheckViolation: + return xerrors.New(xerrors.ErrInvalidInput, "data constraint violation"). + WithContext("table", pgErr.TableName). + WithContext("column", pgErr.ColumnName).WithError(err) + } + } + if errors.Is(err, pgx.ErrNoRows) { + return xerrors.New(xerrors.ErrNotFound, "record not found").WithError(err) + } + return xerrors.New(xerrors.ErrInternal, "unexpected database error").WithError(err) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..775c0e0 --- /dev/null +++ b/go.mod @@ -0,0 +1,20 @@ +module code.nochebuena.dev/go/postgres + +go 1.25 + +require ( + code.nochebuena.dev/go/health v0.9.0 + code.nochebuena.dev/go/launcher v0.9.0 + code.nochebuena.dev/go/logz v0.9.0 + code.nochebuena.dev/go/xerrors v0.9.0 + github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6 + github.com/jackc/pgx/v5 v5.8.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a327c8c --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +code.nochebuena.dev/go/health v0.9.0 h1:x0UKjC7CHAE3AgwyFzCyjmGJIjoLBBxeOHxXuqpbKwI= +code.nochebuena.dev/go/health v0.9.0/go.mod h1:f3IsNtU60JSn5yXmBBh9XOvr5pRyEah5+wS4tjDQZso= +code.nochebuena.dev/go/launcher v0.9.0 h1:dJHonA9Xm03AQKK0919FJaQn9ZKHZ+RZfB9yxjnx3TA= +code.nochebuena.dev/go/launcher v0.9.0/go.mod h1:IBtntmbnyddukjEhxlc7Ysdzz9nZsnd9+8FzAIHt77g= +code.nochebuena.dev/go/logz v0.9.0 h1:wfV7vtI4V/8ED7Hm31Fbql7Y5iOGrlHN4X8Z5ajTZZE= +code.nochebuena.dev/go/logz v0.9.0/go.mod h1:qODhSbKb+tWE7rdhHLcKweiP5CgwIaWoZxadCT3bQV8= +code.nochebuena.dev/go/xerrors v0.9.0 h1:8wrDto7e44ZW1YPOnT6JrxYXTqnvNuKpAO1/5bcT4TE= +code.nochebuena.dev/go/xerrors v0.9.0/go.mod h1:mtXo7xscBreCB7w7smlBP5Onv8H1HVohCvF0I/VXbAY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6 h1:D/V0gu4zQ3cL2WKeVNVM4r2gLxGGf6McLwgXzRTo2RQ= +github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/postgres.go b/postgres.go new file mode 100644 index 0000000..3c5d4f5 --- /dev/null +++ b/postgres.go @@ -0,0 +1,279 @@ +package postgres + +import ( + "context" + "fmt" + "net/url" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + + "code.nochebuena.dev/go/health" + "code.nochebuena.dev/go/launcher" + "code.nochebuena.dev/go/logz" +) + +// Executor is the shared query interface for pool and transaction. +type Executor interface { + Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row +} + +// Tx extends Executor with commit/rollback. +type Tx interface { + Executor + Commit(ctx context.Context) error + Rollback(ctx context.Context) error +} + +// Client is the database access interface. +type Client interface { + // GetExecutor returns the active transaction from ctx if one exists, + // otherwise returns the pool. + GetExecutor(ctx context.Context) Executor + Begin(ctx context.Context) (Tx, error) + Ping(ctx context.Context) error + HandleError(err error) error +} + +// Component bundles launcher lifecycle, health check, and database client. +type Component interface { + launcher.Component + health.Checkable + Client +} + +// UnitOfWork wraps operations in a single database transaction. +type UnitOfWork interface { + Do(ctx context.Context, fn func(ctx context.Context) error) error +} + +// Config holds PostgreSQL connection settings. +type Config struct { + Host string `env:"PG_HOST,required"` + Port int `env:"PG_PORT" envDefault:"5432"` + User string `env:"PG_USER,required"` + Password string `env:"PG_PASSWORD,required"` + Name string `env:"PG_NAME,required"` + SSLMode string `env:"PG_SSL_MODE" envDefault:"disable"` + Timezone string `env:"PG_TIMEZONE" envDefault:"UTC"` + MaxConns int `env:"PG_MAX_CONNS" envDefault:"5"` + MinConns int `env:"PG_MIN_CONNS" envDefault:"2"` + MaxConnLifetime string `env:"PG_MAX_CONN_LIFETIME" envDefault:"1h"` + MaxConnIdleTime string `env:"PG_MAX_CONN_IDLE_TIME" envDefault:"30m"` + HealthCheckPeriod string `env:"PG_HEALTH_CHECK_PERIOD" envDefault:"1m"` +} + +// DSN constructs a PostgreSQL connection string from the configuration. +func (c Config) DSN() string { + u := &url.URL{ + Scheme: "postgres", + User: url.UserPassword(c.User, c.Password), + Host: fmt.Sprintf("%s:%d", c.Host, c.Port), + Path: "/" + c.Name, + } + q := u.Query() + q.Set("sslmode", c.SSLMode) + q.Set("timezone", c.Timezone) + u.RawQuery = q.Encode() + return u.String() +} + +// ctxTxKey is the context key for the active transaction. +type ctxTxKey struct{} + +// --- pgComponent --- + +type pgComponent struct { + logger logz.Logger + cfg Config + pool *pgxpool.Pool + mu sync.RWMutex +} + +// New returns a postgres Component. Call lc.Append(db) to manage its lifecycle. +func New(logger logz.Logger, cfg Config) Component { + return &pgComponent{logger: logger, cfg: cfg} +} + +func (c *pgComponent) OnInit() error { + poolCfg, err := c.buildPoolConfig() + if err != nil { + return fmt.Errorf("postgres: parse config: %w", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + pool, err := pgxpool.NewWithConfig(ctx, poolCfg) + if err != nil { + c.logger.Error("postgres: failed to create pool", err) + return fmt.Errorf("postgres: create pool: %w", err) + } + c.mu.Lock() + c.pool = pool + c.mu.Unlock() + return nil +} + +func (c *pgComponent) OnStart() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := c.Ping(ctx); err != nil { + return fmt.Errorf("postgres: ping failed: %w", err) + } + c.logger.Info("postgres: connected") + return nil +} + +func (c *pgComponent) OnStop() error { + c.mu.Lock() + defer c.mu.Unlock() + if c.pool != nil { + c.logger.Info("postgres: closing pool") + c.pool.Close() + c.pool = nil + } + return nil +} + +func (c *pgComponent) Ping(ctx context.Context) error { + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + if pool == nil { + return fmt.Errorf("postgres: pool not initialized") + } + return pool.Ping(ctx) +} + +func (c *pgComponent) GetExecutor(ctx context.Context) Executor { + if tx, ok := ctx.Value(ctxTxKey{}).(Executor); ok { + return tx + } + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + return pool +} + +func (c *pgComponent) Begin(ctx context.Context) (Tx, error) { + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + if pool == nil { + return nil, fmt.Errorf("postgres: pool not initialized") + } + tx, err := pool.Begin(ctx) + if err != nil { + return nil, err + } + return &pgTx{Tx: tx}, nil +} + +func (c *pgComponent) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) { + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + return pool.Exec(ctx, sql, args...) +} + +func (c *pgComponent) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) { + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + return pool.Query(ctx, sql, args...) +} + +func (c *pgComponent) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { + c.mu.RLock() + pool := c.pool + c.mu.RUnlock() + return pool.QueryRow(ctx, sql, args...) +} + +func (c *pgComponent) HandleError(err error) error { return HandleError(err) } + +// health.Checkable +func (c *pgComponent) HealthCheck(ctx context.Context) error { return c.Ping(ctx) } +func (c *pgComponent) Name() string { return "postgres" } +func (c *pgComponent) Priority() health.Level { return health.LevelCritical } + +func (c *pgComponent) buildPoolConfig() (*pgxpool.Config, error) { + cfg, err := pgxpool.ParseConfig(c.cfg.DSN()) + if err != nil { + return nil, err + } + cfg.MaxConns = int32(c.cfg.MaxConns) + cfg.MinConns = int32(c.cfg.MinConns) + if c.cfg.MaxConnLifetime != "" { + d, err := time.ParseDuration(c.cfg.MaxConnLifetime) + if err != nil { + return nil, fmt.Errorf("PG_MAX_CONN_LIFETIME: %w", err) + } + cfg.MaxConnLifetime = d + } + if c.cfg.MaxConnIdleTime != "" { + d, err := time.ParseDuration(c.cfg.MaxConnIdleTime) + if err != nil { + return nil, fmt.Errorf("PG_MAX_CONN_IDLE_TIME: %w", err) + } + cfg.MaxConnIdleTime = d + } + if c.cfg.HealthCheckPeriod != "" { + d, err := time.ParseDuration(c.cfg.HealthCheckPeriod) + if err != nil { + return nil, fmt.Errorf("PG_HEALTH_CHECK_PERIOD: %w", err) + } + cfg.HealthCheckPeriod = d + } + return cfg, nil +} + +// --- pgTx --- + +type pgTx struct{ pgx.Tx } + +func (t *pgTx) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) { + return t.Tx.Exec(ctx, sql, args...) +} + +func (t *pgTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) { + return t.Tx.Query(ctx, sql, args...) +} + +func (t *pgTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { + return t.Tx.QueryRow(ctx, sql, args...) +} + +func (t *pgTx) Commit(ctx context.Context) error { return t.Tx.Commit(ctx) } +func (t *pgTx) Rollback(ctx context.Context) error { return t.Tx.Rollback(ctx) } + +// --- UnitOfWork --- + +type unitOfWork struct { + logger logz.Logger + client Client +} + +// NewUnitOfWork returns a UnitOfWork backed by the given client. +func NewUnitOfWork(logger logz.Logger, client Client) UnitOfWork { + return &unitOfWork{logger: logger, client: client} +} + +func (u *unitOfWork) Do(ctx context.Context, fn func(ctx context.Context) error) error { + tx, err := u.client.Begin(ctx) + if err != nil { + return fmt.Errorf("postgres: begin transaction: %w", err) + } + ctx = context.WithValue(ctx, ctxTxKey{}, tx) + if err := fn(ctx); err != nil { + if rbErr := tx.Rollback(ctx); rbErr != nil { + u.logger.Error("postgres: rollback failed", rbErr) + } + return err + } + return tx.Commit(ctx) +} diff --git a/postgres_test.go b/postgres_test.go new file mode 100644 index 0000000..b110298 --- /dev/null +++ b/postgres_test.go @@ -0,0 +1,175 @@ +package postgres + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgerrcode" + pgx "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + + "code.nochebuena.dev/go/health" + "code.nochebuena.dev/go/logz" + "code.nochebuena.dev/go/xerrors" +) + +func newLogger() logz.Logger { return logz.New(logz.Options{}) } + +// --- New / name / priority --- + +func TestNew(t *testing.T) { + if New(newLogger(), Config{}) == nil { + t.Fatal("New returned nil") + } +} + +func TestComponent_Name(t *testing.T) { + c := New(newLogger(), Config{}).(health.Checkable) + if c.Name() != "postgres" { + t.Error("Name() != postgres") + } +} + +func TestComponent_Priority(t *testing.T) { + c := New(newLogger(), Config{}).(health.Checkable) + if c.Priority() != health.LevelCritical { + t.Error("Priority() != LevelCritical") + } +} + +func TestComponent_OnStop_NilPool(t *testing.T) { + c := &pgComponent{logger: newLogger()} + if err := c.OnStop(); err != nil { + t.Errorf("OnStop with nil pool: %v", err) + } +} + +// --- Config.DSN --- + +func TestConfig_DSN(t *testing.T) { + cfg := Config{ + Host: "localhost", Port: 5432, + User: "user", Password: "pass", + Name: "mydb", SSLMode: "disable", Timezone: "UTC", + } + dsn := cfg.DSN() + for _, want := range []string{"localhost:5432", "mydb", "sslmode=disable"} { + if !strContains(dsn, want) { + t.Errorf("DSN %q missing %q", dsn, want) + } + } +} + +// --- HandleError --- + +func TestHandleError_Nil(t *testing.T) { + if err := HandleError(nil); err != nil { + t.Errorf("want nil, got %v", err) + } +} + +func TestHandleError_UniqueViolation(t *testing.T) { + assertCode(t, HandleError(&pgconn.PgError{Code: pgerrcode.UniqueViolation}), xerrors.ErrAlreadyExists) +} + +func TestHandleError_ForeignKey(t *testing.T) { + assertCode(t, HandleError(&pgconn.PgError{Code: pgerrcode.ForeignKeyViolation}), xerrors.ErrInvalidInput) +} + +func TestHandleError_CheckViolation(t *testing.T) { + assertCode(t, HandleError(&pgconn.PgError{Code: pgerrcode.CheckViolation}), xerrors.ErrInvalidInput) +} + +func TestHandleError_NoRows(t *testing.T) { + assertCode(t, HandleError(pgx.ErrNoRows), xerrors.ErrNotFound) +} + +func TestHandleError_Generic(t *testing.T) { + assertCode(t, HandleError(errors.New("boom")), xerrors.ErrInternal) +} + +// --- UnitOfWork --- + +type mockTx struct{ committed, rolledBack bool } + +func (m *mockTx) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) { + return pgconn.CommandTag{}, nil +} +func (m *mockTx) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) { + return nil, nil +} +func (m *mockTx) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { return nil } +func (m *mockTx) Commit(ctx context.Context) error { m.committed = true; return nil } +func (m *mockTx) Rollback(ctx context.Context) error { m.rolledBack = true; return nil } + +type mockClient struct{ tx *mockTx } + +func (m *mockClient) Begin(ctx context.Context) (Tx, error) { return m.tx, nil } +func (m *mockClient) Ping(ctx context.Context) error { return nil } +func (m *mockClient) HandleError(err error) error { return HandleError(err) } +func (m *mockClient) GetExecutor(ctx context.Context) Executor { + if tx, ok := ctx.Value(ctxTxKey{}).(Executor); ok { + return tx + } + return nil +} + +func TestUnitOfWork_Commit(t *testing.T) { + tx := &mockTx{} + uow := NewUnitOfWork(newLogger(), &mockClient{tx: tx}) + if err := uow.Do(context.Background(), func(ctx context.Context) error { return nil }); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !tx.committed { + t.Error("expected Commit") + } +} + +func TestUnitOfWork_Rollback(t *testing.T) { + tx := &mockTx{} + uow := NewUnitOfWork(newLogger(), &mockClient{tx: tx}) + _ = uow.Do(context.Background(), func(ctx context.Context) error { return errors.New("fail") }) + if !tx.rolledBack { + t.Error("expected Rollback") + } +} + +func TestUnitOfWork_InjectsExecutor(t *testing.T) { + tx := &mockTx{} + client := &mockClient{tx: tx} + uow := NewUnitOfWork(newLogger(), client) + var got Executor + _ = uow.Do(context.Background(), func(ctx context.Context) error { + got = client.GetExecutor(ctx) + return nil + }) + if got != tx { + t.Error("GetExecutor should return the injected Tx") + } +} + +// --- helpers --- + +func assertCode(t *testing.T, err error, want xerrors.Code) { + t.Helper() + var xe *xerrors.Err + if !errors.As(err, &xe) { + t.Fatalf("expected *xerrors.Err, got %T: %v", err, err) + } + if xe.Code() != want { + t.Errorf("want code %s, got %s", want, xe.Code()) + } +} + +func strContains(s, sub string) bool { + if len(sub) == 0 { + return true + } + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +}