UPDATE: Media module
All checks were successful
Build and Release / release (push) Successful in 1m7s
All checks were successful
Build and Release / release (push) Successful in 1m7s
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
_ "history-api/pkg/log"
|
||||
"history-api/pkg/mbtiles"
|
||||
"history-api/pkg/oauth"
|
||||
"history-api/pkg/storage"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
@@ -72,6 +73,12 @@ func StartServer() {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
storageClient, err := storage.NewS3Storage()
|
||||
if err != nil {
|
||||
log.Error().Msg(err.Error())
|
||||
panic(err)
|
||||
}
|
||||
|
||||
googleOAuthConfig, err := oauth.NewGoogleProvider()
|
||||
if err != nil {
|
||||
log.Error().Msg(err.Error())
|
||||
@@ -89,7 +96,7 @@ func StartServer() {
|
||||
}
|
||||
|
||||
serverHttp := NewHttpServer()
|
||||
serverHttp.SetupServer(poolPg, sqlTile, redisClient, googleOAuthConfig)
|
||||
serverHttp.SetupServer(poolPg, sqlTile, redisClient, storageClient, googleOAuthConfig)
|
||||
Singleton = serverHttp
|
||||
|
||||
done := make(chan bool, 1)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"history-api/internal/routes"
|
||||
"history-api/internal/services"
|
||||
"history-api/pkg/cache"
|
||||
"history-api/pkg/storage"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -55,7 +56,7 @@ func NewHttpServer() *FiberServer {
|
||||
return server
|
||||
}
|
||||
|
||||
func (s *FiberServer) SetupServer(sqlPg sqlc.DBTX, sqlTile *sql.DB, redis cache.Cache, oauth *oauth2.Config) {
|
||||
func (s *FiberServer) SetupServer(sqlPg sqlc.DBTX, sqlTile *sql.DB, redis cache.Cache, sclient storage.Storage, oauth *oauth2.Config) {
|
||||
// Apply CORS middleware
|
||||
s.App.Use(cors.New(cors.Config{
|
||||
AllowOrigins: []string{
|
||||
@@ -75,22 +76,26 @@ func (s *FiberServer) SetupServer(sqlPg sqlc.DBTX, sqlTile *sql.DB, redis cache.
|
||||
roleRepo := repositories.NewRoleRepository(sqlPg, redis)
|
||||
tileRepo := repositories.NewTileRepository(sqlTile, redis)
|
||||
tokenRepo := repositories.NewTokenRepository(redis)
|
||||
mediaRepo := repositories.NewMediaRepository(sqlPg, redis)
|
||||
|
||||
// service setup
|
||||
authService := services.NewAuthService(userRepo, roleRepo, tokenRepo, redis)
|
||||
userService := services.NewUserService(userRepo, roleRepo)
|
||||
roleService := services.NewRoleService(roleRepo)
|
||||
tileService := services.NewTileService(tileRepo)
|
||||
mediaService := services.NewMediaService(mediaRepo, tokenRepo, sclient, redis)
|
||||
|
||||
// controller setup
|
||||
authController := controllers.NewAuthController(authService, oauth)
|
||||
userController := controllers.NewUserController(userService)
|
||||
userController := controllers.NewUserController(userService, mediaService)
|
||||
tileController := controllers.NewTileController(tileService)
|
||||
roleController := controllers.NewRoleController(roleService)
|
||||
mediaController := controllers.NewMediaController(mediaService)
|
||||
|
||||
// route setup
|
||||
routes.AuthRoutes(s.App, authController, userRepo)
|
||||
routes.UserRoutes(s.App, userController, userRepo)
|
||||
routes.MediaRoutes(s.App, mediaController, userRepo)
|
||||
routes.RoleRoutes(s.App, roleController, userRepo)
|
||||
routes.TileRoutes(s.App, tileController)
|
||||
routes.NotFoundRoute(s.App)
|
||||
|
||||
119
cmd/worker/storage/main.go
Normal file
119
cmd/worker/storage/main.go
Normal file
@@ -0,0 +1,119 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"history-api/internal/models"
|
||||
"history-api/pkg/cache"
|
||||
"history-api/pkg/config"
|
||||
"history-api/pkg/constants"
|
||||
_ "history-api/pkg/log"
|
||||
"history-api/pkg/storage"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func runSingleWorker(ctx context.Context, rdb *redis.Client, consumerID int, sc storage.Storage) {
|
||||
consumerName := "worker-" + strconv.Itoa(consumerID)
|
||||
|
||||
log.Info().Str("worker", consumerName).Msg("Worker started and ready")
|
||||
|
||||
for {
|
||||
entries, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: constants.GroupStorageName,
|
||||
Consumer: consumerName,
|
||||
Streams: []string{constants.StreamStorageName, ">"},
|
||||
Count: 1,
|
||||
Block: 0,
|
||||
}).Result()
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Str("worker", consumerName).Msg("Failed to read stream")
|
||||
time.Sleep(2 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, stream := range entries {
|
||||
for _, message := range stream.Messages {
|
||||
taskType := message.Values["task_type"].(string)
|
||||
payloadStr := message.Values["payload"].(string)
|
||||
|
||||
if taskType == constants.TaskTypeDeleteMedia.String() {
|
||||
var data models.MediaStorageEntity
|
||||
if err := json.Unmarshal([]byte(payloadStr), &data); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to unmarshal payload")
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("worker", consumerName).
|
||||
Str("storage_key", data.StorageKey).
|
||||
Msg("Processing delete media task")
|
||||
|
||||
errSend := sc.Delete(ctx, data.StorageKey)
|
||||
if errSend != nil {
|
||||
log.Error().Err(errSend).Str("storage_key", data.StorageKey).Msg("Failed to delete media")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
rdb.XAck(ctx, constants.StreamStorageName, constants.GroupStorageName, message.ID)
|
||||
log.Info().Str("msg_id", message.ID).Msg("Task acknowledged")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func main() {
|
||||
|
||||
config.LoadEnv()
|
||||
|
||||
workerCountStr := config.GetConfigWithDefault("STORAGE_WORKER_COUNT", "1")
|
||||
workerCount, err := strconv.Atoi(workerCountStr)
|
||||
if err != nil || workerCount <= 0 {
|
||||
workerCount = 1
|
||||
}
|
||||
|
||||
cacheInterface, err := cache.NewRedisClient()
|
||||
if err != nil {
|
||||
log.Fatal().
|
||||
Err(err).
|
||||
Msg("Failed to connect to Redis")
|
||||
}
|
||||
|
||||
rdb := cacheInterface.GetRawClient()
|
||||
|
||||
sc, err := storage.NewS3Storage()
|
||||
if err != nil {
|
||||
log.Fatal().
|
||||
Err(err).
|
||||
Msg("Failed to create S3 storage client")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err = rdb.XGroupCreateMkStream(ctx, constants.StreamStorageName, constants.GroupStorageName, "$").Err()
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
log.Fatal().
|
||||
Err(err).
|
||||
Msg("Failed to create Redis Stream Group")
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Int("worker_count", workerCount).
|
||||
Msg("Starting storage worker system")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 1; i <= workerCount; i++ {
|
||||
wg.Go(func() {
|
||||
runSingleWorker(ctx, rdb, i, sc)
|
||||
})
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
Reference in New Issue
Block a user