Files
cache-valkey/new.go

142 lines
4.0 KiB
Go
Raw Permalink Normal View History

2026-05-29 15:58:56 +00:00
package cachevalkey
import (
"context"
"strconv"
"time"
vk "github.com/valkey-io/valkey-go"
"code.nochebuena.dev/einherjar/contracts/logging"
"code.nochebuena.dev/einherjar/contracts/observability"
"code.nochebuena.dev/einherjar/core/xerrors"
)
var _ Component = (*vkImpl)(nil)
var _ observability.Identifiable = (*vkImpl)(nil)
// New returns a Component backed by the given config.
// Register the returned value with launcher and health before starting.
func New(logger logging.Logger, cfg Config) Component {
return &vkImpl{cfg: cfg, logger: logger}
}
type vkImpl struct {
cfg Config
logger logging.Logger
client vk.Client
}
func (v *vkImpl) OnInit() error {
opts := vk.ClientOption{
InitAddress: v.cfg.Addrs,
Password: v.cfg.Password,
SelectDB: v.cfg.SelectDB,
}
if v.cfg.CacheSizeEachConn > 0 {
opts.CacheSizeEachConn = v.cfg.CacheSizeEachConn * 1024 * 1024
}
client, err := vk.NewClient(opts)
if err != nil {
return xerrors.Internal("valkey: failed to create client").WithError(err)
}
v.client = client
return nil
}
func (v *vkImpl) OnStart() error {
if v.client == nil {
return xerrors.Internal("valkey: client not initialized")
}
if err := v.client.Do(context.Background(), v.client.B().Ping().Build()).Error(); err != nil {
return xerrors.Internal("valkey: ping failed").WithError(err)
}
v.logger.Info("valkey: connected")
return nil
}
func (v *vkImpl) OnStop() error {
v.logger.Info("valkey: closing client")
if v.client != nil {
v.client.Close()
}
return nil
}
func (v *vkImpl) Name() string { return "valkey" }
func (v *vkImpl) Priority() observability.Level { return observability.LevelDegraded }
func (v *vkImpl) Native() vk.Client { return v.client }
func (v *vkImpl) HealthCheck(ctx context.Context) error {
if v.client == nil {
return xerrors.Internal("valkey: client not initialized")
}
return v.client.Do(ctx, v.client.B().Ping().Build()).Error()
}
func (v *vkImpl) Get(ctx context.Context, key string) (string, bool, error) {
val, err := v.client.Do(ctx, v.client.B().Get().Key(key).Build()).ToString()
if err != nil {
if vk.IsValkeyNil(err) {
return "", false, nil
}
return "", false, xerrors.Internal("valkey: Get failed").WithError(err)
}
return val, true, nil
}
func (v *vkImpl) Set(ctx context.Context, key string, value string, ttl time.Duration) error {
var cmd vk.Completed
if ttl > 0 {
cmd = v.client.B().Set().Key(key).Value(value).Ex(ttl).Build()
} else {
cmd = v.client.B().Set().Key(key).Value(value).Build()
}
if err := v.client.Do(ctx, cmd).Error(); err != nil {
return xerrors.Internal("valkey: Set failed").WithError(err)
}
return nil
}
func (v *vkImpl) Del(ctx context.Context, keys ...string) error {
if err := v.client.Do(ctx, v.client.B().Del().Key(keys...).Build()).Error(); err != nil {
return xerrors.Internal("valkey: Del failed").WithError(err)
}
return nil
}
func (v *vkImpl) Exists(ctx context.Context, key string) (bool, error) {
n, err := v.client.Do(ctx, v.client.B().Exists().Key(key).Build()).AsInt64()
if err != nil {
return false, xerrors.Internal("valkey: Exists failed").WithError(err)
}
return n > 0, nil
}
func (v *vkImpl) Expire(ctx context.Context, key string, ttl time.Duration) error {
if err := v.client.Do(ctx, v.client.B().Expire().Key(key).Seconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
return xerrors.Internal("valkey: Expire failed").WithError(err)
}
return nil
}
const luaIncrWithTTL = `local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
return count`
func (v *vkImpl) IncrWithTTL(ctx context.Context, key string, ttl time.Duration) (int64, error) {
result := v.client.Do(ctx, v.client.B().Eval().
Script(luaIncrWithTTL).
Numkeys(1).
Key(key).
Arg(strconv.FormatInt(int64(ttl.Seconds()), 10)).
Build())
count, err := result.AsInt64()
if err != nil {
return 0, xerrors.Internal("valkey: IncrWithTTL failed").WithError(err)
}
return count, nil
}