From 7559667ce281eac4aa7ff418793058f489987fdb Mon Sep 17 00:00:00 2001 From: AzenKain Date: Thu, 9 Apr 2026 09:32:34 +0700 Subject: [PATCH] UPDATE: Change auth logic --- cmd/worker/email/main.go | 9 +++- cmd/worker/storage/main.go | 30 ++++++++++++- db/query/files.sql | 4 ++ db/query/verification.sql | 24 ++++++---- docs/docs.go | 57 ++++++++++++++++++++++++ docs/swagger.json | 57 ++++++++++++++++++++++++ docs/swagger.yaml | 36 +++++++++++++++ internal/controllers/authController.go | 21 ++++++++- internal/controllers/mediaController.go | 51 +++++++++++++++++++++ internal/dtos/request/media.go | 4 ++ internal/gen/sqlc/files.sql.go | 10 +++++ internal/middlewares/jwtMiddleware.go | 36 ++++++++++++++- internal/models/media.go | 9 ++++ internal/repositories/mediaRepository.go | 23 ++++++++++ internal/routes/authRoute.go | 2 +- internal/routes/mediaRoute.go | 7 ++- internal/services/authService.go | 9 +++- internal/services/mediaService.go | 34 ++++++++++++++ pkg/constants/task.go | 5 ++- pkg/storage/s3.go | 40 +++++++++++++++++ 20 files changed, 448 insertions(+), 20 deletions(-) diff --git a/cmd/worker/email/main.go b/cmd/worker/email/main.go index 109ec85..36eb92d 100644 --- a/cmd/worker/email/main.go +++ b/cmd/worker/email/main.go @@ -42,7 +42,14 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int) { for _, message := range stream.Messages { taskType := message.Values["task_type"].(string) payloadStr := message.Values["payload"].(string) - + taskType, ok1 := message.Values["task_type"].(string) + payloadStr, ok2 := message.Values["payload"].(string) + if !ok1 || !ok2 { + log.Error().Msg("Invalid message format") + rdb.XAck(ctx, constants.StreamEmailName, constants.GroupEmailName, message.ID) + continue + } + if taskType == constants.TaskTypeSendEmailOTP.String() { var data models.TokenEntity if err := json.Unmarshal([]byte(payloadStr), &data); err != nil { diff --git a/cmd/worker/storage/main.go b/cmd/worker/storage/main.go index 50f984b..072b68f 100644 --- a/cmd/worker/storage/main.go +++ b/cmd/worker/storage/main.go @@ -40,8 +40,13 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc for _, stream := range entries { for _, message := range stream.Messages { - taskType := message.Values["task_type"].(string) - payloadStr := message.Values["payload"].(string) + taskType, ok1 := message.Values["task_type"].(string) + payloadStr, ok2 := message.Values["payload"].(string) + if !ok1 || !ok2 { + log.Error().Msg("Invalid message format") + rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID) + continue + } if taskType == constants.TaskTypeDeleteMedia.String() { var data models.MediaStorageEntity @@ -62,6 +67,27 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc } } + if taskType == constants.TaskTypeBulkDeleteMedia.String() { + var data []*models.MediaStorageEntity + if err := json.Unmarshal([]byte(payloadStr), &data); err != nil { + log.Error().Err(err).Msg("Failed to unmarshal payload") + continue + } + storageKeys := make([]string, len(data)) + for i, item := range data { + storageKeys[i] = item.StorageKey + } + log.Info(). + Str("worker", consumerName). + Int("count", len(storageKeys)). + Msg("Processing bulk delete media task") + errSend := sc.BulkDelete(ctx, storageKeys) + if errSend != nil { + log.Error().Err(errSend).Msg("Failed to bulk delete") + continue + } + } + rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID) log.Info().Str("msg_id", message.ID).Msg("Task acknowledged") } diff --git a/db/query/files.sql b/db/query/files.sql index bb6ae0c..6c4048d 100644 --- a/db/query/files.sql +++ b/db/query/files.sql @@ -10,6 +10,10 @@ RETURNING *; DELETE FROM medias WHERE id = $1; +-- name: DeleteMedias :exec +DELETE FROM medias +WHERE id = ANY($1::uuid[]); + -- name: SearchMedias :many SELECT id, user_id, storage_key, original_name, mime_type, size, file_metadata, created_at, updated_at diff --git a/db/query/verification.sql b/db/query/verification.sql index 9a89a5e..7183562 100644 --- a/db/query/verification.sql +++ b/db/query/verification.sql @@ -6,13 +6,6 @@ INSERT INTO user_verifications ( ) RETURNING *; --- name: CreateVerificationMedia :exec -INSERT INTO verification_medias ( - verification_id, media_id -) VALUES ( - $1, $2 -); - -- name: GetUserVerificationByID :one SELECT uv.id, @@ -93,4 +86,19 @@ WHERE id = $1; -- name: DeleteVerificationMedia :exec DELETE FROM verification_medias -WHERE verification_id = $1 AND media_id = $2; \ No newline at end of file +WHERE verification_id = $1 AND media_id = $2; + +-- name: CreateVerificationMedia :exec +INSERT INTO verification_medias ( + verification_id, media_id +) VALUES ( + $1, $2 +); + +-- name: DeleteAllVerificationMedias :exec +DELETE FROM verification_medias +WHERE verification_id = $1; + +-- name: BulkDeleteVerificationMedias :exec +DELETE FROM verification_medias +WHERE verification_id = $1 AND media_id = ANY($2::uuid[]); \ No newline at end of file diff --git a/docs/docs.go b/docs/docs.go index 06f7b9d..da7bb5d 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -452,6 +452,49 @@ const docTemplate = `{ } } } + }, + "delete": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Delete multiple media files by IDs", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Media" + ], + "summary": "Delete media", + "parameters": [ + { + "description": "Media IDs to delete", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_request.MediaBulkDeleteDto" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_response.CommonResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_response.CommonResponse" + } + } + } } }, "/media/presigned": { @@ -1460,6 +1503,20 @@ const docTemplate = `{ } } }, + "history-api_internal_dtos_request.MediaBulkDeleteDto": { + "type": "object", + "required": [ + "media_ids" + ], + "properties": { + "media_ids": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, "history-api_internal_dtos_request.SignInDto": { "type": "object", "required": [ diff --git a/docs/swagger.json b/docs/swagger.json index 8b0c2c0..22652ef 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -445,6 +445,49 @@ } } } + }, + "delete": { + "security": [ + { + "BearerAuth": [] + } + ], + "description": "Delete multiple media files by IDs", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Media" + ], + "summary": "Delete media", + "parameters": [ + { + "description": "Media IDs to delete", + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_request.MediaBulkDeleteDto" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_response.CommonResponse" + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "$ref": "#/definitions/history-api_internal_dtos_response.CommonResponse" + } + } + } } }, "/media/presigned": { @@ -1453,6 +1496,20 @@ } } }, + "history-api_internal_dtos_request.MediaBulkDeleteDto": { + "type": "object", + "required": [ + "media_ids" + ], + "properties": { + "media_ids": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, "history-api_internal_dtos_request.SignInDto": { "type": "object", "required": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index a9c9cf9..3e4cc7e 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -60,6 +60,15 @@ definitions: - new_password - token_id type: object + history-api_internal_dtos_request.MediaBulkDeleteDto: + properties: + media_ids: + items: + type: string + type: array + required: + - media_ids + type: object history-api_internal_dtos_request.SignInDto: properties: email: @@ -443,6 +452,33 @@ paths: tags: - Auth /media: + delete: + consumes: + - application/json + description: Delete multiple media files by IDs + parameters: + - description: Media IDs to delete + in: body + name: body + required: true + schema: + $ref: '#/definitions/history-api_internal_dtos_request.MediaBulkDeleteDto' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/history-api_internal_dtos_response.CommonResponse' + "500": + description: Internal Server Error + schema: + $ref: '#/definitions/history-api_internal_dtos_response.CommonResponse' + security: + - BearerAuth: [] + summary: Delete media + tags: + - Media get: consumes: - application/json diff --git a/internal/controllers/authController.go b/internal/controllers/authController.go index df68f45..6504a41 100644 --- a/internal/controllers/authController.go +++ b/internal/controllers/authController.go @@ -9,6 +9,7 @@ import ( "history-api/internal/models" "history-api/internal/services" "history-api/pkg/validator" + "strings" "time" "github.com/gofiber/fiber/v3" @@ -136,6 +137,16 @@ func (h *AuthController) Signup(c fiber.Ctx) error { }) } + +func (h *AuthController) getRefreshToken(c fiber.Ctx) string { + auth := c.Get("Authorization") + if auth != "" { + return strings.TrimPrefix(auth, "Bearer ") + } + + return c.Cookies("refresh_token") +} + // RefreshToken godoc // @Summary Refresh session tokens // @Description Generate a new access token using a valid refresh token from context @@ -151,7 +162,15 @@ func (h *AuthController) RefreshToken(c fiber.Ctx) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - res, err := h.service.RefreshToken(ctx, c.Locals("uid").(string)) + tokenJwt := h.getRefreshToken(c) + if tokenJwt == "" { + return c.Status(fiber.StatusUnauthorized).JSON(response.CommonResponse{ + Status: false, + Message: "Missing refresh token", + }) + } + + res, err := h.service.RefreshToken(ctx, c.Locals("uid").(string), tokenJwt) if err != nil { return c.Status(fiber.StatusInternalServerError).JSON(response.CommonResponse{ Status: false, diff --git a/internal/controllers/mediaController.go b/internal/controllers/mediaController.go index 22867ef..237abb1 100644 --- a/internal/controllers/mediaController.go +++ b/internal/controllers/mediaController.go @@ -124,6 +124,57 @@ func (m *MediaController) DeleteMedia(c fiber.Ctx) error { }) } +// BulkDeleteMedia godoc +// @Summary Delete media +// @Description Delete multiple media files by IDs +// @Tags Media +// @Accept json +// @Produce json +// @Security BearerAuth +// @Param body body request.MediaBulkDeleteDto true "Media IDs to delete" +// @Success 200 {object} response.CommonResponse +// @Failure 500 {object} response.CommonResponse +// @Router /media [delete] +func (m *MediaController) BulkDeleteMedia(c fiber.Ctx) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + claimsVal := c.Locals("user_claims") + if claimsVal == nil { + return c.Status(fiber.StatusUnauthorized).JSON(response.CommonResponse{ + Status: false, + Message: "Unauthorized", + }) + } + + claims, ok := claimsVal.(*response.JWTClaims) + if !ok { + return c.Status(fiber.StatusUnauthorized).JSON(response.CommonResponse{ + Status: false, + Message: "Invalid user claims", + }) + } + + dto := &request.MediaBulkDeleteDto{} + if err := validator.ValidateBodyDto(c, dto); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(response.CommonResponse{ + Status: false, + Message: err.Error(), + }) + } + + err := m.service.BulkDeleteMedia(ctx, claims, dto) + if err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(response.CommonResponse{ + Status: false, + Message: err.Error(), + }) + } + return c.Status(fiber.StatusOK).JSON(response.CommonResponse{ + Status: true, + Message: "Media deleted successfully", + }) +} + // UploadServerSide godoc // @Summary Upload media (server-side) // @Description Upload media file through server diff --git a/internal/dtos/request/media.go b/internal/dtos/request/media.go index b670324..9bcd2de 100644 --- a/internal/dtos/request/media.go +++ b/internal/dtos/request/media.go @@ -19,3 +19,7 @@ type SearchMediaDto struct { MinSize *int64 `json:"min_size" query:"min_size" validate:"omitempty,min=0"` MaxSize *int64 `json:"max_size" query:"max_size" validate:"omitempty,min=0,gtefield=MinSize"` } + +type MediaBulkDeleteDto struct { + MediaIDs []string `json:"media_ids" validate:"required,dive,uuid"` +} diff --git a/internal/gen/sqlc/files.sql.go b/internal/gen/sqlc/files.sql.go index 92a229f..9701012 100644 --- a/internal/gen/sqlc/files.sql.go +++ b/internal/gen/sqlc/files.sql.go @@ -100,6 +100,16 @@ func (q *Queries) DeleteMedia(ctx context.Context, id pgtype.UUID) error { return err } +const deleteMedias = `-- name: DeleteMedias :exec +DELETE FROM medias +WHERE id = ANY($1::uuid[]) +` + +func (q *Queries) DeleteMedias(ctx context.Context, dollar_1 []pgtype.UUID) error { + _, err := q.db.Exec(ctx, deleteMedias, dollar_1) + return err +} + const getMediaByID = `-- name: GetMediaByID :one SELECT id, user_id, storage_key, original_name, mime_type, size, file_metadata, created_at, updated_at FROM medias WHERE id = $1 diff --git a/internal/middlewares/jwtMiddleware.go b/internal/middlewares/jwtMiddleware.go index 4b4ade7..ee48610 100644 --- a/internal/middlewares/jwtMiddleware.go +++ b/internal/middlewares/jwtMiddleware.go @@ -31,7 +31,7 @@ func JwtAccess(userRepo repositories.UserRepository) fiber.Handler { }) } -func JwtRefresh(userRepo repositories.UserRepository) fiber.Handler { +func JwtRefresh() fiber.Handler { jwtRefreshSecret, err := config.GetConfig("JWT_REFRESH_SECRET") if err != nil { return nil @@ -40,7 +40,7 @@ func JwtRefresh(userRepo repositories.UserRepository) fiber.Handler { return jwtware.New(jwtware.Config{ SigningKey: jwtware.SigningKey{Key: []byte(jwtRefreshSecret)}, ErrorHandler: jwtError, - SuccessHandler: jwtSuccess(userRepo), + SuccessHandler: jwtSuccessRefresh(), Extractor: extractors.Chain( extractors.FromAuthHeader("Bearer"), extractors.FromCookie("refresh_token"), @@ -100,6 +100,38 @@ func jwtSuccess(userRepo repositories.UserRepository) fiber.Handler { } } +func jwtSuccessRefresh() fiber.Handler { + return func(c fiber.Ctx) error { + unauthorized := func() error { + return c.Status(fiber.StatusUnauthorized).JSON(response.CommonResponse{ + Status: false, + Message: "Invalid or missing token", + }) + } + + jwtToken := jwtware.FromContext(c) + if jwtToken == nil { + return unauthorized() + } + + claims, ok := jwtToken.Claims.(*response.JWTClaims) + if !ok { + return unauthorized() + } + + if slices.Contains(claims.Roles, constants.BANNED) { + return c.Status(fiber.StatusForbidden).JSON(response.CommonResponse{ + Status: false, + Message: "User account is banned", + }) + } + + c.Locals("uid", claims.UId) + c.Locals("user_claims", claims) + return c.Next() + } +} + func jwtError(c fiber.Ctx, err error) error { if err.Error() == "Missing or malformed JWT" { return c.Status(fiber.StatusBadRequest). diff --git a/internal/models/media.go b/internal/models/media.go index a673b47..02e8c93 100644 --- a/internal/models/media.go +++ b/internal/models/media.go @@ -50,3 +50,12 @@ func MediaEntitiesToResponse(entities []*MediaEntity) []*response.MediaResponse } return responses } + + +func MediaEntitiesToStorageEntitye(entities []*MediaEntity) []*MediaStorageEntity { + responses := make([]*MediaStorageEntity, len(entities)) + for i, entity := range entities { + responses[i] = entity.ToStorageEntity() + } + return responses +} \ No newline at end of file diff --git a/internal/repositories/mediaRepository.go b/internal/repositories/mediaRepository.go index 1f6e238..0290c3e 100644 --- a/internal/repositories/mediaRepository.go +++ b/internal/repositories/mediaRepository.go @@ -16,10 +16,12 @@ import ( type MediaRepository interface { GetByID(ctx context.Context, id pgtype.UUID) (*models.MediaEntity, error) + GetByIDs(ctx context.Context, ids []string) ([]*models.MediaEntity, error) GetByUserID(ctx context.Context, userId pgtype.UUID) ([]*models.MediaEntity, error) Search(ctx context.Context, params sqlc.SearchMediasParams) ([]*models.MediaEntity, error) Count(ctx context.Context, params sqlc.CountMediasParams) (int64, error) Delete(ctx context.Context, id pgtype.UUID) error + BulkDelete(ctx context.Context, ids []pgtype.UUID) error Create(ctx context.Context, params sqlc.CreateMediaParams) (*models.MediaEntity, error) } @@ -81,6 +83,10 @@ func (r *mediaRepository) getByIDsWithFallback(ctx context.Context, ids []string return medias, nil } +func (r *mediaRepository) GetByIDs(ctx context.Context, ids []string) ([]*models.MediaEntity, error) { + return r.getByIDsWithFallback(ctx, ids) +} + func (r *mediaRepository) GetByID(ctx context.Context, id pgtype.UUID) (*models.MediaEntity, error) { cacheId := fmt.Sprintf("media:id:%s", convert.UUIDToString(id)) var media models.MediaEntity @@ -152,6 +158,23 @@ func (r *mediaRepository) Delete(ctx context.Context, id pgtype.UUID) error { return nil } +func (r *mediaRepository) BulkDelete(ctx context.Context, ids []pgtype.UUID) error { + if len(ids) == 0 { + return nil + } + err := r.q.DeleteMedias(ctx, ids) + if err != nil { + return err + } + keys := make([]string, len(ids)) + for i, id := range ids { + keys[i] = fmt.Sprintf("media:id:%s", convert.UUIDToString(id)) + } + _ = r.c.Del(ctx, keys...) + + return nil +} + func (r *mediaRepository) Search(ctx context.Context, params sqlc.SearchMediasParams) ([]*models.MediaEntity, error) { queryKey := r.generateQueryKey("media:search", params) var cachedIDs []string diff --git a/internal/routes/authRoute.go b/internal/routes/authRoute.go index ea2d367..8d1d406 100644 --- a/internal/routes/authRoute.go +++ b/internal/routes/authRoute.go @@ -12,7 +12,7 @@ func AuthRoutes(app *fiber.App, controller *controllers.AuthController, userRepo route := app.Group("/auth") route.Post("/signin", controller.Signin) route.Post("/signup", controller.Signup) - route.Post("/refresh", middlewares.JwtRefresh(userRepo), controller.RefreshToken) + route.Post("/refresh", middlewares.JwtRefresh(), controller.RefreshToken) route.Post("/token/create", controller.CreateToken) route.Post("/token/verify", controller.VerifyToken) route.Post("/forgot-password", controller.ForgotPassword) diff --git a/internal/routes/mediaRoute.go b/internal/routes/mediaRoute.go index b61947f..f1c3c0c 100644 --- a/internal/routes/mediaRoute.go +++ b/internal/routes/mediaRoute.go @@ -17,7 +17,12 @@ func MediaRoutes(app *fiber.App, controller *controllers.MediaController, userRe middlewares.RequireAnyRole(constants.ADMIN, constants.MOD), controller.SearchMedia, ) - + route.Delete( + "/", + middlewares.JwtAccess(userRepo), + controller.BulkDeleteMedia, + ) + route.Post( "/upload", middlewares.JwtAccess(userRepo), diff --git a/internal/services/authService.go b/internal/services/authService.go index df11522..c81e68a 100644 --- a/internal/services/authService.go +++ b/internal/services/authService.go @@ -38,7 +38,7 @@ type AuthService interface { VerifyToken(ctx context.Context, dto *request.VerifyTokenDto) (*response.VerifyTokenResponse, error) CreateToken(ctx context.Context, dto *request.CreateTokenDto) error SigninWithGoogle(ctx context.Context, dto *request.SigninWithGoogleDto) (*response.AuthResponse, error) - RefreshToken(ctx context.Context, id string) (*response.AuthResponse, error) + RefreshToken(ctx context.Context, id string, refreshToken string) (*response.AuthResponse, error) } type authService struct { @@ -203,7 +203,7 @@ func (a *authService) Logout(ctx context.Context, userId string) error { return nil } -func (a *authService) RefreshToken(ctx context.Context, id string) (*response.AuthResponse, error) { +func (a *authService) RefreshToken(ctx context.Context, id string, refreshToken string) (*response.AuthResponse, error) { var pgID pgtype.UUID err := pgID.Scan(id) if err != nil { @@ -213,6 +213,11 @@ func (a *authService) RefreshToken(ctx context.Context, id string) (*response.Au if err != nil { return nil, fiber.NewError(fiber.StatusInternalServerError, "Invalid user data") } + + if user.RefreshToken != refreshToken { + return nil, fiber.NewError(fiber.StatusUnauthorized, "Invalid refresh token") + } + roles := models.RolesEntityToRoleConstant(user.Roles) if slices.Contains(roles, constants.BANNED) { diff --git a/internal/services/mediaService.go b/internal/services/mediaService.go index f32736e..0d0a675 100644 --- a/internal/services/mediaService.go +++ b/internal/services/mediaService.go @@ -32,6 +32,7 @@ type MediaService interface { GetMediaByUserID(ctx context.Context, userId string) ([]*response.MediaResponse, error) SearchMedia(ctx context.Context, dto *request.SearchMediaDto) (*response.PaginatedResponse, error) DeleteMedia(ctx context.Context, claims *response.JWTClaims, mediaId string) error + BulkDeleteMedia(ctx context.Context, claims *response.JWTClaims, dto *request.MediaBulkDeleteDto) error UploadServerSide(ctx context.Context, userId string, fileHeader *multipart.FileHeader) (*response.MediaResponse, error) GeneratePresignedURL(ctx context.Context, userId string, dto *request.PreSignedDto) (*response.PreSignedResponse, error) PreSignedCompleted(ctx context.Context, userId string, dto *request.PreSignedCompleteDto) (*response.MediaResponse, error) @@ -88,6 +89,39 @@ func (m *mediaService) DeleteMedia(ctx context.Context, claims *response.JWTClai return nil } +func (m *mediaService) BulkDeleteMedia(ctx context.Context, claims *response.JWTClaims, dto *request.MediaBulkDeleteDto) error { + listMedia, err := m.mediaRepo.GetByIDs(ctx, dto.MediaIDs) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } + shoudDelete := false + if slices.Contains(claims.Roles, constants.ADMIN) || slices.Contains(claims.Roles, constants.MOD) { + shoudDelete = true + } + listMediaIds := make([]pgtype.UUID, len(listMedia)) + listMediaStorageEntities := make([]*models.MediaStorageEntity, len(listMedia)) + for _, media := range listMedia { + if media.UserID != claims.UId && !shoudDelete { + return fiber.NewError(fiber.StatusForbidden, "You don't have permission to delete this media") + } + id, err := convert.StringToUUID(media.ID) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } + listMediaIds = append(listMediaIds, id) + listMediaStorageEntities = append(listMediaStorageEntities, media.ToStorageEntity()) + } + + err = m.mediaRepo.BulkDelete(ctx, listMediaIds) + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } + + m.c.PublishTask(ctx, constants.StreamStorageName, constants.TaskTypeBulkDeleteMedia, listMediaStorageEntities) + + return nil +} + func (m *mediaService) GetMediaByID(ctx context.Context, id string) (*response.MediaResponse, error) { mediaId, err := convert.StringToUUID(id) if err != nil { diff --git a/pkg/constants/task.go b/pkg/constants/task.go index 7303a96..269aac3 100644 --- a/pkg/constants/task.go +++ b/pkg/constants/task.go @@ -3,8 +3,9 @@ package constants type TaskType string const ( - TaskTypeSendEmailOTP TaskType = "SEND_EMAIL_OTP" - TaskTypeDeleteMedia TaskType = "DELETE_MEDIA" + TaskTypeSendEmailOTP TaskType = "SEND_EMAIL_OTP" + TaskTypeDeleteMedia TaskType = "DELETE_MEDIA" + TaskTypeBulkDeleteMedia TaskType = "BULK_DELETE_MEDIA" ) func (t TaskType) String() string { diff --git a/pkg/storage/s3.go b/pkg/storage/s3.go index e831139..0ea1cad 100644 --- a/pkg/storage/s3.go +++ b/pkg/storage/s3.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/rs/zerolog/log" ffconfig "history-api/pkg/config" @@ -33,6 +34,7 @@ type Storage interface { PresignUpload(ctx context.Context, key string, expire time.Duration, opts UploadOptions) (string, error) GetURL(ctx context.Context, key string, expire time.Duration) (string, error) Delete(ctx context.Context, key string) error + BulkDelete(ctx context.Context, keys []string) error GetMainBucket() string GetTempBucket() string } @@ -186,3 +188,41 @@ func (s *s3Storage) Delete(ctx context.Context, key string) error { }) return err } + +func (s *s3Storage) BulkDelete(ctx context.Context, keys []string) error { + if len(keys) == 0 { + return nil + } + + batchSize := 1000 + var hasError bool + + for i := 0; i < len(keys); i += batchSize { + end := i + batchSize + if end > len(keys) { + end = len(keys) + } + + batch := keys[i:end] + var objects []types.ObjectIdentifier + for _, k := range batch { + objects = append(objects, types.ObjectIdentifier{Key: aws.String(k)}) + } + + _, err := s.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{ + Bucket: aws.String(s.bucket), + Delete: &types.Delete{Objects: objects}, + }) + + if err != nil { + log.Error().Err(err).Int("start", i).Int("end", end).Msg("S3 batch delete failed") + hasError = true + continue + } + } + + if hasError { + return fmt.Errorf("one or more batches failed to delete") + } + return nil +} \ No newline at end of file