package cachevalkey import ( "context" "strconv" "time" vk "github.com/valkey-io/valkey-go" "code.nochebuena.dev/einherjar/contracts/logging" "code.nochebuena.dev/einherjar/contracts/observability" "code.nochebuena.dev/einherjar/core/xerrors" ) var _ Component = (*vkImpl)(nil) var _ observability.Identifiable = (*vkImpl)(nil) // New returns a Component backed by the given config. // Register the returned value with launcher and health before starting. func New(logger logging.Logger, cfg Config) Component { return &vkImpl{cfg: cfg, logger: logger} } type vkImpl struct { cfg Config logger logging.Logger client vk.Client } func (v *vkImpl) OnInit() error { opts := vk.ClientOption{ InitAddress: v.cfg.Addrs, Password: v.cfg.Password, SelectDB: v.cfg.SelectDB, } if v.cfg.CacheSizeEachConn > 0 { opts.CacheSizeEachConn = v.cfg.CacheSizeEachConn * 1024 * 1024 } client, err := vk.NewClient(opts) if err != nil { return xerrors.Internal("valkey: failed to create client").WithError(err) } v.client = client return nil } func (v *vkImpl) OnStart() error { if v.client == nil { return xerrors.Internal("valkey: client not initialized") } if err := v.client.Do(context.Background(), v.client.B().Ping().Build()).Error(); err != nil { return xerrors.Internal("valkey: ping failed").WithError(err) } v.logger.Info("valkey: connected") return nil } func (v *vkImpl) OnStop() error { v.logger.Info("valkey: closing client") if v.client != nil { v.client.Close() } return nil } func (v *vkImpl) Name() string { return "valkey" } func (v *vkImpl) Priority() observability.Level { return observability.LevelDegraded } func (v *vkImpl) Native() vk.Client { return v.client } func (v *vkImpl) HealthCheck(ctx context.Context) error { if v.client == nil { return xerrors.Internal("valkey: client not initialized") } return v.client.Do(ctx, v.client.B().Ping().Build()).Error() } func (v *vkImpl) Get(ctx context.Context, key string) (string, bool, error) { val, err := v.client.Do(ctx, v.client.B().Get().Key(key).Build()).ToString() if err != nil { if vk.IsValkeyNil(err) { return "", false, nil } return "", false, xerrors.Internal("valkey: Get failed").WithError(err) } return val, true, nil } func (v *vkImpl) Set(ctx context.Context, key string, value string, ttl time.Duration) error { var cmd vk.Completed if ttl > 0 { cmd = v.client.B().Set().Key(key).Value(value).Ex(ttl).Build() } else { cmd = v.client.B().Set().Key(key).Value(value).Build() } if err := v.client.Do(ctx, cmd).Error(); err != nil { return xerrors.Internal("valkey: Set failed").WithError(err) } return nil } func (v *vkImpl) Del(ctx context.Context, keys ...string) error { if err := v.client.Do(ctx, v.client.B().Del().Key(keys...).Build()).Error(); err != nil { return xerrors.Internal("valkey: Del failed").WithError(err) } return nil } func (v *vkImpl) Exists(ctx context.Context, key string) (bool, error) { n, err := v.client.Do(ctx, v.client.B().Exists().Key(key).Build()).AsInt64() if err != nil { return false, xerrors.Internal("valkey: Exists failed").WithError(err) } return n > 0, nil } func (v *vkImpl) Expire(ctx context.Context, key string, ttl time.Duration) error { if err := v.client.Do(ctx, v.client.B().Expire().Key(key).Seconds(int64(ttl.Seconds())).Build()).Error(); err != nil { return xerrors.Internal("valkey: Expire failed").WithError(err) } return nil } const luaIncrWithTTL = `local count = redis.call('INCR', KEYS[1]) if count == 1 then redis.call('EXPIRE', KEYS[1], ARGV[1]) end return count` func (v *vkImpl) IncrWithTTL(ctx context.Context, key string, ttl time.Duration) (int64, error) { result := v.client.Do(ctx, v.client.B().Eval(). Script(luaIncrWithTTL). Numkeys(1). Key(key). Arg(strconv.FormatInt(int64(ttl.Seconds()), 10)). Build()) count, err := result.AsInt64() if err != nil { return 0, xerrors.Internal("valkey: IncrWithTTL failed").WithError(err) } return count, nil }