UPDATE: Change auth logic
All checks were successful
Build and Release / release (push) Successful in 1m27s
All checks were successful
Build and Release / release (push) Successful in 1m27s
This commit is contained in:
@@ -42,7 +42,14 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int) {
|
||||
for _, message := range stream.Messages {
|
||||
taskType := message.Values["task_type"].(string)
|
||||
payloadStr := message.Values["payload"].(string)
|
||||
|
||||
taskType, ok1 := message.Values["task_type"].(string)
|
||||
payloadStr, ok2 := message.Values["payload"].(string)
|
||||
if !ok1 || !ok2 {
|
||||
log.Error().Msg("Invalid message format")
|
||||
rdb.XAck(ctx, constants.StreamEmailName, constants.GroupEmailName, message.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
if taskType == constants.TaskTypeSendEmailOTP.String() {
|
||||
var data models.TokenEntity
|
||||
if err := json.Unmarshal([]byte(payloadStr), &data); err != nil {
|
||||
|
||||
@@ -40,8 +40,13 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc
|
||||
|
||||
for _, stream := range entries {
|
||||
for _, message := range stream.Messages {
|
||||
taskType := message.Values["task_type"].(string)
|
||||
payloadStr := message.Values["payload"].(string)
|
||||
taskType, ok1 := message.Values["task_type"].(string)
|
||||
payloadStr, ok2 := message.Values["payload"].(string)
|
||||
if !ok1 || !ok2 {
|
||||
log.Error().Msg("Invalid message format")
|
||||
rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
if taskType == constants.TaskTypeDeleteMedia.String() {
|
||||
var data models.MediaStorageEntity
|
||||
@@ -62,6 +67,27 @@ func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc
|
||||
}
|
||||
}
|
||||
|
||||
if taskType == constants.TaskTypeBulkDeleteMedia.String() {
|
||||
var data []*models.MediaStorageEntity
|
||||
if err := json.Unmarshal([]byte(payloadStr), &data); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to unmarshal payload")
|
||||
continue
|
||||
}
|
||||
storageKeys := make([]string, len(data))
|
||||
for i, item := range data {
|
||||
storageKeys[i] = item.StorageKey
|
||||
}
|
||||
log.Info().
|
||||
Str("worker", consumerName).
|
||||
Int("count", len(storageKeys)).
|
||||
Msg("Processing bulk delete media task")
|
||||
errSend := sc.BulkDelete(ctx, storageKeys)
|
||||
if errSend != nil {
|
||||
log.Error().Err(errSend).Msg("Failed to bulk delete")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID)
|
||||
log.Info().Str("msg_id", message.ID).Msg("Task acknowledged")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user