Introduces code.nochebuena.dev/einherjar/cache-valkey — the Valkey cache starter for the Einherjar framework. Absorbs the valkey package from micro-lib and adds three duck-typed adapters that wire directly into auth, web, and auth-jwt. Core: - Provider interface — Get, Set, Del, Exists, Expire, IncrWithTTL, Native() - Component interface — lifecycle.Component + observability.Checkable + Provider - Config struct (EINHERJAR_VALKEY_* env vars) - New(logger, cfg) Component — creates valkey-go client in OnInit; PING in OnStart; logs "valkey: connected" - IncrWithTTL implemented with Lua script (atomic INCR + conditional EXPIRE); race-free fixed-window semantics with no MULTI/EXEC overhead - Priority: LevelDegraded — Valkey outage degrades, does not halt the service Adapters (duck-typed — no import of auth, web, or auth-jwt): - PermissionCache — Get/Set int64 bitmasks as string; satisfies auth/rbac.Cache - RateLimiterStore — fixed-window via IncrWithTTL; satisfies web/mw.RateLimiterStore - Blacklist — Exists/Set "1" with TTL; satisfies auth-jwt.Blacklist Compliance test verifies CT-6, duck-type shape assignments, and full adapter behavioural coverage with a mockProvider (no live server required). - Component interface embeds observability.Identifiable; identifiable.go implements ModulePath and ModuleVersion via runtime/debug.ReadBuildInfo() — prints in launcher banner
142 lines
4.0 KiB
Go
142 lines
4.0 KiB
Go
package cachevalkey
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"time"
|
|
|
|
vk "github.com/valkey-io/valkey-go"
|
|
|
|
"code.nochebuena.dev/einherjar/contracts/logging"
|
|
"code.nochebuena.dev/einherjar/contracts/observability"
|
|
"code.nochebuena.dev/einherjar/core/xerrors"
|
|
)
|
|
|
|
var _ Component = (*vkImpl)(nil)
|
|
var _ observability.Identifiable = (*vkImpl)(nil)
|
|
|
|
// New returns a Component backed by the given config.
|
|
// Register the returned value with launcher and health before starting.
|
|
func New(logger logging.Logger, cfg Config) Component {
|
|
return &vkImpl{cfg: cfg, logger: logger}
|
|
}
|
|
|
|
type vkImpl struct {
|
|
cfg Config
|
|
logger logging.Logger
|
|
client vk.Client
|
|
}
|
|
|
|
func (v *vkImpl) OnInit() error {
|
|
opts := vk.ClientOption{
|
|
InitAddress: v.cfg.Addrs,
|
|
Password: v.cfg.Password,
|
|
SelectDB: v.cfg.SelectDB,
|
|
}
|
|
if v.cfg.CacheSizeEachConn > 0 {
|
|
opts.CacheSizeEachConn = v.cfg.CacheSizeEachConn * 1024 * 1024
|
|
}
|
|
client, err := vk.NewClient(opts)
|
|
if err != nil {
|
|
return xerrors.Internal("valkey: failed to create client").WithError(err)
|
|
}
|
|
v.client = client
|
|
return nil
|
|
}
|
|
|
|
func (v *vkImpl) OnStart() error {
|
|
if v.client == nil {
|
|
return xerrors.Internal("valkey: client not initialized")
|
|
}
|
|
if err := v.client.Do(context.Background(), v.client.B().Ping().Build()).Error(); err != nil {
|
|
return xerrors.Internal("valkey: ping failed").WithError(err)
|
|
}
|
|
v.logger.Info("valkey: connected")
|
|
return nil
|
|
}
|
|
|
|
func (v *vkImpl) OnStop() error {
|
|
v.logger.Info("valkey: closing client")
|
|
if v.client != nil {
|
|
v.client.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *vkImpl) Name() string { return "valkey" }
|
|
func (v *vkImpl) Priority() observability.Level { return observability.LevelDegraded }
|
|
func (v *vkImpl) Native() vk.Client { return v.client }
|
|
|
|
func (v *vkImpl) HealthCheck(ctx context.Context) error {
|
|
if v.client == nil {
|
|
return xerrors.Internal("valkey: client not initialized")
|
|
}
|
|
return v.client.Do(ctx, v.client.B().Ping().Build()).Error()
|
|
}
|
|
|
|
func (v *vkImpl) Get(ctx context.Context, key string) (string, bool, error) {
|
|
val, err := v.client.Do(ctx, v.client.B().Get().Key(key).Build()).ToString()
|
|
if err != nil {
|
|
if vk.IsValkeyNil(err) {
|
|
return "", false, nil
|
|
}
|
|
return "", false, xerrors.Internal("valkey: Get failed").WithError(err)
|
|
}
|
|
return val, true, nil
|
|
}
|
|
|
|
func (v *vkImpl) Set(ctx context.Context, key string, value string, ttl time.Duration) error {
|
|
var cmd vk.Completed
|
|
if ttl > 0 {
|
|
cmd = v.client.B().Set().Key(key).Value(value).Ex(ttl).Build()
|
|
} else {
|
|
cmd = v.client.B().Set().Key(key).Value(value).Build()
|
|
}
|
|
if err := v.client.Do(ctx, cmd).Error(); err != nil {
|
|
return xerrors.Internal("valkey: Set failed").WithError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *vkImpl) Del(ctx context.Context, keys ...string) error {
|
|
if err := v.client.Do(ctx, v.client.B().Del().Key(keys...).Build()).Error(); err != nil {
|
|
return xerrors.Internal("valkey: Del failed").WithError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (v *vkImpl) Exists(ctx context.Context, key string) (bool, error) {
|
|
n, err := v.client.Do(ctx, v.client.B().Exists().Key(key).Build()).AsInt64()
|
|
if err != nil {
|
|
return false, xerrors.Internal("valkey: Exists failed").WithError(err)
|
|
}
|
|
return n > 0, nil
|
|
}
|
|
|
|
func (v *vkImpl) Expire(ctx context.Context, key string, ttl time.Duration) error {
|
|
if err := v.client.Do(ctx, v.client.B().Expire().Key(key).Seconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
|
|
return xerrors.Internal("valkey: Expire failed").WithError(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
const luaIncrWithTTL = `local count = redis.call('INCR', KEYS[1])
|
|
if count == 1 then
|
|
redis.call('EXPIRE', KEYS[1], ARGV[1])
|
|
end
|
|
return count`
|
|
|
|
func (v *vkImpl) IncrWithTTL(ctx context.Context, key string, ttl time.Duration) (int64, error) {
|
|
result := v.client.Do(ctx, v.client.B().Eval().
|
|
Script(luaIncrWithTTL).
|
|
Numkeys(1).
|
|
Key(key).
|
|
Arg(strconv.FormatInt(int64(ttl.Seconds()), 10)).
|
|
Build())
|
|
count, err := result.AsInt64()
|
|
if err != nil {
|
|
return 0, xerrors.Internal("valkey: IncrWithTTL failed").WithError(err)
|
|
}
|
|
return count, nil
|
|
}
|