feat(storage-minio): initial implementation — MinIO/S3 object storage with lifecycle (v1.0.0)
Introduces code.nochebuena.dev/einherjar/storage-minio — the object storage starter for the Einherjar framework. Absorbs the minio package from micro-lib, replacing fmt.Errorf wrapping with core/xerrors. Interfaces (CT-6: one TypeSpec per file): - Provider — PutObject, RemoveObject, GetObject, PresignedGetObject, HandleError - Component — lifecycle.Component + observability.Checkable + Provider + Native() Implementation: - New(logger, cfg) Component — client not created until OnInit - OnInit: minio.New with credentials and transport; bucket existence check - OnStart: BucketExists PING; logs "minio: connected" - OnStop: logs "minio: closing client" (minio-go is stateless; no explicit close) - HealthCheck: BucketExists check; Priority LevelCritical - Native() *miniogo.Client — escape hatch for operations not in Provider - HandleError: maps minio-go errors to xerrors (NotFound, AlreadyExists, Internal) Config (EINHERJAR_MINIO_* env vars): Endpoint(required), AccessKey(required), SecretKey(required), Bucket(required), UseSSL(false), Region(us-east-1) - Component interface embeds observability.Identifiable; identifiable.go implements ModulePath and ModuleVersion via runtime/debug.ReadBuildInfo() — prints in launcher banner
This commit is contained in:
115
new.go
Normal file
115
new.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package minio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
miniogo "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
||||
"code.nochebuena.dev/einherjar/contracts/logging"
|
||||
"code.nochebuena.dev/einherjar/contracts/observability"
|
||||
"code.nochebuena.dev/einherjar/core/xerrors"
|
||||
)
|
||||
|
||||
var _ Component = (*minioImpl)(nil)
|
||||
var _ observability.Identifiable = (*minioImpl)(nil)
|
||||
|
||||
// New returns a Component backed by the given config.
|
||||
// Register the returned value with the launcher and health aggregator before starting.
|
||||
func New(logger logging.Logger, cfg Config) Component {
|
||||
return &minioImpl{cfg: cfg, logger: logger}
|
||||
}
|
||||
|
||||
type minioImpl struct {
|
||||
cfg Config
|
||||
logger logging.Logger
|
||||
mc *miniogo.Client
|
||||
}
|
||||
|
||||
func (c *minioImpl) OnInit() error {
|
||||
opts := &miniogo.Options{
|
||||
Creds: credentials.NewStaticV4(c.cfg.AccessKey, c.cfg.SecretKey, ""),
|
||||
Secure: c.cfg.UseSSL,
|
||||
Region: c.cfg.Region,
|
||||
Transport: c.cfg.Transport,
|
||||
}
|
||||
mc, err := miniogo.New(c.cfg.Endpoint, opts)
|
||||
if err != nil {
|
||||
return xerrors.Internal("minio: create client").WithError(err)
|
||||
}
|
||||
c.mc = mc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) OnStart() error {
|
||||
if c.mc == nil {
|
||||
return xerrors.Internal("minio: client not initialized")
|
||||
}
|
||||
exists, err := c.mc.BucketExists(context.Background(), c.cfg.Bucket)
|
||||
if err != nil {
|
||||
return xerrors.Internal("minio: check bucket").
|
||||
WithContext("bucket", c.cfg.Bucket).WithError(err)
|
||||
}
|
||||
if !exists {
|
||||
if err := c.mc.MakeBucket(context.Background(), c.cfg.Bucket, miniogo.MakeBucketOptions{}); err != nil {
|
||||
return xerrors.Internal("minio: create bucket").
|
||||
WithContext("bucket", c.cfg.Bucket).WithError(err)
|
||||
}
|
||||
c.logger.Info("minio: bucket created", "bucket", c.cfg.Bucket)
|
||||
}
|
||||
c.logger.Info("minio: connected", "bucket", c.cfg.Bucket)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) OnStop() error {
|
||||
c.mc = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) Name() string { return "minio" }
|
||||
func (c *minioImpl) Priority() observability.Level { return observability.LevelCritical }
|
||||
func (c *minioImpl) Native() *miniogo.Client { return c.mc }
|
||||
|
||||
func (c *minioImpl) HealthCheck(ctx context.Context) error {
|
||||
if c.mc == nil {
|
||||
return xerrors.Internal("minio: client not initialized")
|
||||
}
|
||||
_, err := c.mc.BucketExists(ctx, c.cfg.Bucket)
|
||||
if err != nil {
|
||||
return xerrors.Internal("minio: health check").WithError(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) PutObject(ctx context.Context, bucket, key string, reader io.Reader, size int64, opts miniogo.PutObjectOptions) (miniogo.UploadInfo, error) {
|
||||
info, err := c.mc.PutObject(ctx, bucket, key, reader, size, opts)
|
||||
if err != nil {
|
||||
return miniogo.UploadInfo{}, HandleError(err)
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) RemoveObject(ctx context.Context, bucket, key string, opts miniogo.RemoveObjectOptions) error {
|
||||
return HandleError(c.mc.RemoveObject(ctx, bucket, key, opts))
|
||||
}
|
||||
|
||||
func (c *minioImpl) GetObject(ctx context.Context, bucket, key string, opts miniogo.GetObjectOptions) (*miniogo.Object, error) {
|
||||
obj, err := c.mc.GetObject(ctx, bucket, key, opts)
|
||||
if err != nil {
|
||||
return nil, HandleError(err)
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) PresignedGetObject(ctx context.Context, bucket, key string, expires time.Duration, reqParams url.Values) (*url.URL, error) {
|
||||
u, err := c.mc.PresignedGetObject(ctx, bucket, key, expires, reqParams)
|
||||
if err != nil {
|
||||
return nil, HandleError(err)
|
||||
}
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func (c *minioImpl) HandleError(err error) error { return HandleError(err) }
|
||||
Reference in New Issue
Block a user