All checks were successful
Build and Release / release (push) Successful in 1m27s
228 lines
5.6 KiB
Go
228 lines
5.6 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/url"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
ffconfig "history-api/pkg/config"
|
|
)
|
|
|
|
type UploadOptions struct {
|
|
ContentType string
|
|
ContentDisposition string
|
|
Metadata map[string]string
|
|
}
|
|
|
|
type MoveOptions struct {
|
|
Bucket string
|
|
Key string
|
|
}
|
|
|
|
type Storage interface {
|
|
Upload(ctx context.Context, key string, body io.Reader, size int64, opts UploadOptions) error
|
|
Move(ctx context.Context, src *MoveOptions, dest *MoveOptions) error
|
|
PresignUpload(ctx context.Context, key string, expire time.Duration, opts UploadOptions) (string, error)
|
|
GetURL(ctx context.Context, key string, expire time.Duration) (string, error)
|
|
Delete(ctx context.Context, key string) error
|
|
BulkDelete(ctx context.Context, keys []string) error
|
|
GetMainBucket() string
|
|
GetTempBucket() string
|
|
}
|
|
|
|
type s3Storage struct {
|
|
client *s3.Client
|
|
ps *s3.PresignClient
|
|
bucket string
|
|
tempBucket string
|
|
endPoint string
|
|
}
|
|
|
|
func NewS3Storage() (Storage, error) {
|
|
accessKey, err := ffconfig.GetConfig("STORAGE_ACCESS_KEY")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
secretAccessKey, err := ffconfig.GetConfig("STORAGE_SECRET_KEY")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bucketName, err := ffconfig.GetConfig("STORAGE_BUCKET_NAME")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tempBucketName, err := ffconfig.GetConfig("STORAGE_BUCKET_TEMP_NAME")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
endpoint, err := ffconfig.GetConfig("STORAGE_ENDPOINT")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
region, err := ffconfig.GetConfig("STORAGE_REGION")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cfg, err := config.LoadDefaultConfig(context.TODO(),
|
|
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretAccessKey, "")),
|
|
config.WithRegion(region),
|
|
)
|
|
|
|
if err != nil {
|
|
log.Error().Msgf("unable to load AWS SDK config, %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
o.BaseEndpoint = aws.String(endpoint)
|
|
o.UsePathStyle = true
|
|
})
|
|
|
|
return &s3Storage{
|
|
client: client,
|
|
ps: s3.NewPresignClient(client),
|
|
bucket: bucketName,
|
|
tempBucket: tempBucketName,
|
|
endPoint: endpoint,
|
|
}, nil
|
|
}
|
|
func (s *s3Storage) GetMainBucket() string { return s.bucket }
|
|
func (s *s3Storage) GetTempBucket() string { return s.tempBucket }
|
|
|
|
func (s *s3Storage) Move(ctx context.Context, src *MoveOptions, dest *MoveOptions) error {
|
|
copySource := url.PathEscape(fmt.Sprintf("%s/%s", src.Bucket, src.Key))
|
|
|
|
_, err := s.client.CopyObject(ctx, &s3.CopyObjectInput{
|
|
Bucket: aws.String(dest.Bucket),
|
|
Key: aws.String(dest.Key),
|
|
CopySource: aws.String(copySource),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to copy object: %w", err)
|
|
}
|
|
|
|
_, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: aws.String(src.Bucket),
|
|
Key: aws.String(src.Key),
|
|
})
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("failed to delete source object after copy")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *s3Storage) Upload(ctx context.Context, key string, body io.Reader, size int64, opts UploadOptions) error {
|
|
input := &s3.PutObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: &key,
|
|
Body: body,
|
|
}
|
|
|
|
if opts.ContentType != "" {
|
|
input.ContentType = aws.String(opts.ContentType)
|
|
}
|
|
if opts.ContentDisposition != "" {
|
|
input.ContentDisposition = aws.String(opts.ContentDisposition)
|
|
}
|
|
if len(opts.Metadata) > 0 {
|
|
input.Metadata = opts.Metadata
|
|
}
|
|
|
|
_, err := s.client.PutObject(ctx, input)
|
|
return err
|
|
}
|
|
|
|
func (s *s3Storage) PresignUpload(ctx context.Context, key string, expire time.Duration, opts UploadOptions) (string, error) {
|
|
input := &s3.PutObjectInput{
|
|
Bucket: &s.tempBucket,
|
|
Key: &key,
|
|
}
|
|
|
|
if opts.ContentType != "" {
|
|
input.ContentType = aws.String(opts.ContentType)
|
|
}
|
|
if opts.ContentDisposition != "" {
|
|
input.ContentDisposition = aws.String(opts.ContentDisposition)
|
|
}
|
|
if len(opts.Metadata) > 0 {
|
|
input.Metadata = opts.Metadata
|
|
}
|
|
|
|
req, err := s.ps.PresignPutObject(ctx, input, s3.WithPresignExpires(expire))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return req.URL, nil
|
|
}
|
|
|
|
func (s *s3Storage) GetURL(ctx context.Context, key string, expire time.Duration) (string, error) {
|
|
req, err := s.ps.PresignGetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: &key,
|
|
}, s3.WithPresignExpires(expire))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return req.URL, nil
|
|
}
|
|
|
|
func (s *s3Storage) Delete(ctx context.Context, key string) error {
|
|
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: &key,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (s *s3Storage) BulkDelete(ctx context.Context, keys []string) error {
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batchSize := 1000
|
|
var hasError bool
|
|
|
|
for i := 0; i < len(keys); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(keys) {
|
|
end = len(keys)
|
|
}
|
|
|
|
batch := keys[i:end]
|
|
var objects []types.ObjectIdentifier
|
|
for _, k := range batch {
|
|
objects = append(objects, types.ObjectIdentifier{Key: aws.String(k)})
|
|
}
|
|
|
|
_, err := s.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Delete: &types.Delete{Objects: objects},
|
|
})
|
|
|
|
if err != nil {
|
|
log.Error().Err(err).Int("start", i).Int("end", end).Msg("S3 batch delete failed")
|
|
hasError = true
|
|
continue
|
|
}
|
|
}
|
|
|
|
if hasError {
|
|
return fmt.Errorf("one or more batches failed to delete")
|
|
}
|
|
return nil
|
|
} |