Files
cache-valkey/new.go
Rene Nochebuena df7aa63e5c feat(cache-valkey): initial implementation — Provider, adapters (v1.0.0)
Introduces code.nochebuena.dev/einherjar/cache-valkey — the Valkey cache starter
for the Einherjar framework. Absorbs the valkey package from micro-lib and adds
three duck-typed adapters that wire directly into auth, web, and auth-jwt.

Core:
- Provider interface — Get, Set, Del, Exists, Expire, IncrWithTTL, Native()
- Component interface — lifecycle.Component + observability.Checkable + Provider
- Config struct (EINHERJAR_VALKEY_* env vars)
- New(logger, cfg) Component — creates valkey-go client in OnInit;
  PING in OnStart; logs "valkey: connected"
- IncrWithTTL implemented with Lua script (atomic INCR + conditional EXPIRE);
  race-free fixed-window semantics with no MULTI/EXEC overhead
- Priority: LevelDegraded — Valkey outage degrades, does not halt the service

Adapters (duck-typed — no import of auth, web, or auth-jwt):
- PermissionCache — Get/Set int64 bitmasks as string; satisfies auth/rbac.Cache
- RateLimiterStore — fixed-window via IncrWithTTL; satisfies web/mw.RateLimiterStore
- Blacklist — Exists/Set "1" with TTL; satisfies auth-jwt.Blacklist

Compliance test verifies CT-6, duck-type shape assignments, and full adapter
behavioural coverage with a mockProvider (no live server required).

- Component interface embeds observability.Identifiable; identifiable.go implements
  ModulePath and ModuleVersion via runtime/debug.ReadBuildInfo() — prints in launcher banner
2026-05-29 15:58:56 +00:00

142 lines
4.0 KiB
Go

package cachevalkey
import (
"context"
"strconv"
"time"
vk "github.com/valkey-io/valkey-go"
"code.nochebuena.dev/einherjar/contracts/logging"
"code.nochebuena.dev/einherjar/contracts/observability"
"code.nochebuena.dev/einherjar/core/xerrors"
)
var _ Component = (*vkImpl)(nil)
var _ observability.Identifiable = (*vkImpl)(nil)
// New returns a Component backed by the given config.
// Register the returned value with launcher and health before starting.
func New(logger logging.Logger, cfg Config) Component {
return &vkImpl{cfg: cfg, logger: logger}
}
type vkImpl struct {
cfg Config
logger logging.Logger
client vk.Client
}
func (v *vkImpl) OnInit() error {
opts := vk.ClientOption{
InitAddress: v.cfg.Addrs,
Password: v.cfg.Password,
SelectDB: v.cfg.SelectDB,
}
if v.cfg.CacheSizeEachConn > 0 {
opts.CacheSizeEachConn = v.cfg.CacheSizeEachConn * 1024 * 1024
}
client, err := vk.NewClient(opts)
if err != nil {
return xerrors.Internal("valkey: failed to create client").WithError(err)
}
v.client = client
return nil
}
func (v *vkImpl) OnStart() error {
if v.client == nil {
return xerrors.Internal("valkey: client not initialized")
}
if err := v.client.Do(context.Background(), v.client.B().Ping().Build()).Error(); err != nil {
return xerrors.Internal("valkey: ping failed").WithError(err)
}
v.logger.Info("valkey: connected")
return nil
}
func (v *vkImpl) OnStop() error {
v.logger.Info("valkey: closing client")
if v.client != nil {
v.client.Close()
}
return nil
}
func (v *vkImpl) Name() string { return "valkey" }
func (v *vkImpl) Priority() observability.Level { return observability.LevelDegraded }
func (v *vkImpl) Native() vk.Client { return v.client }
func (v *vkImpl) HealthCheck(ctx context.Context) error {
if v.client == nil {
return xerrors.Internal("valkey: client not initialized")
}
return v.client.Do(ctx, v.client.B().Ping().Build()).Error()
}
func (v *vkImpl) Get(ctx context.Context, key string) (string, bool, error) {
val, err := v.client.Do(ctx, v.client.B().Get().Key(key).Build()).ToString()
if err != nil {
if vk.IsValkeyNil(err) {
return "", false, nil
}
return "", false, xerrors.Internal("valkey: Get failed").WithError(err)
}
return val, true, nil
}
func (v *vkImpl) Set(ctx context.Context, key string, value string, ttl time.Duration) error {
var cmd vk.Completed
if ttl > 0 {
cmd = v.client.B().Set().Key(key).Value(value).Ex(ttl).Build()
} else {
cmd = v.client.B().Set().Key(key).Value(value).Build()
}
if err := v.client.Do(ctx, cmd).Error(); err != nil {
return xerrors.Internal("valkey: Set failed").WithError(err)
}
return nil
}
func (v *vkImpl) Del(ctx context.Context, keys ...string) error {
if err := v.client.Do(ctx, v.client.B().Del().Key(keys...).Build()).Error(); err != nil {
return xerrors.Internal("valkey: Del failed").WithError(err)
}
return nil
}
func (v *vkImpl) Exists(ctx context.Context, key string) (bool, error) {
n, err := v.client.Do(ctx, v.client.B().Exists().Key(key).Build()).AsInt64()
if err != nil {
return false, xerrors.Internal("valkey: Exists failed").WithError(err)
}
return n > 0, nil
}
func (v *vkImpl) Expire(ctx context.Context, key string, ttl time.Duration) error {
if err := v.client.Do(ctx, v.client.B().Expire().Key(key).Seconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
return xerrors.Internal("valkey: Expire failed").WithError(err)
}
return nil
}
const luaIncrWithTTL = `local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return count`
func (v *vkImpl) IncrWithTTL(ctx context.Context, key string, ttl time.Duration) (int64, error) {
result := v.client.Do(ctx, v.client.B().Eval().
Script(luaIncrWithTTL).
Numkeys(1).
Key(key).
Arg(strconv.FormatInt(int64(ttl.Seconds()), 10)).
Build())
count, err := result.AsInt64()
if err != nil {
return 0, xerrors.Internal("valkey: IncrWithTTL failed").WithError(err)
}
return count, nil
}