diff --git a/cmd/worker/cron/main.go b/cmd/worker/cron/main.go index ca60469..4cef376 100644 --- a/cmd/worker/cron/main.go +++ b/cmd/worker/cron/main.go @@ -19,13 +19,26 @@ import ( func runStatistics(ctx context.Context, repo repositories.StatisticRepository) { log.Info().Msg("Running daily statistics...") - today := time.Now().Truncate(24 * time.Hour) - _, err := repo.Upsert(ctx, today) + + loc, err := time.LoadLocation("Asia/Ho_Chi_Minh") if err != nil { - log.Error().Err(err).Msg("Failed to upsert system statistics") - } else { - log.Info().Msg("Successfully updated daily statistics and cleared cache") + log.Warn().Err(err).Msg("Failed to load Asia/Ho_Chi_Minh timezone, falling back to fixed UTC+7") + loc = time.FixedZone("ICT", 7*3600) } + + now := time.Now().In(loc) + today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) + + // Upsert stats for today, yesterday, and the day before to prevent timezone gaps/delays + for i := 0; i < 3; i++ { + date := today.AddDate(0, 0, -i) + log.Info().Str("date", date.Format("2006-01-02")).Msg("Upserting system statistics") + _, err = repo.Upsert(ctx, date) + if err != nil { + log.Error().Err(err).Str("date", date.Format("2006-01-02")).Msg("Failed to upsert system statistics") + } + } + log.Info().Msg("Successfully updated daily statistics and cleared cache") } func runBackup(ctx context.Context, s3 storage.Storage, dbURI string) { diff --git a/cmd/worker/rag/main.go b/cmd/worker/rag/main.go index fcfb298..d5c442a 100644 --- a/cmd/worker/rag/main.go +++ b/cmd/worker/rag/main.go @@ -53,33 +53,24 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag var vectors [][]float32 var err error - for attempt := 0; attempt <= maxRetries; attempt++ { - if attempt > 0 { - delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt-1))) - log.Warn(). - Str("worker", workerName). - Str("wiki_id", wiki.ID). - Int("attempt", attempt). - Dur("delay", delay). - Msg("Retrying wiki embedding") - time.Sleep(delay) - } - + for attempt := 0; ; attempt++ { chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText) if err == nil { break } + delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt))) + if delay > 2*time.Minute { + delay = 2 * time.Minute + } + log.Error().Err(err). Str("worker", workerName). Str("wiki_id", wiki.ID). - Int("attempt", attempt). - Msg("Failed to prepare wiki chunks") - } - - if err != nil { - log.Error().Err(err).Str("worker", workerName).Str("wiki_id", wiki.ID).Msg("Giving up on wiki after max retries") - continue + Int("attempt", attempt+1). + Dur("retry_delay", delay). + Msg("Failed to prepare wiki chunks, retrying...") + time.Sleep(delay) } _ = ragRepo.DeleteBySourceIDs(ctx, "wiki", []string{wiki.ID}) @@ -106,33 +97,24 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag var vectors [][]float32 var err error - for attempt := 0; attempt <= maxRetries; attempt++ { - if attempt > 0 { - delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt-1))) - log.Warn(). - Str("worker", workerName). - Str("entity_id", entity.ID). - Int("attempt", attempt). - Dur("delay", delay). - Msg("Retrying entity embedding") - time.Sleep(delay) - } - + for attempt := 0; ; attempt++ { chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText) if err == nil { break } + delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt))) + if delay > 2*time.Minute { + delay = 2 * time.Minute + } + log.Error().Err(err). Str("worker", workerName). Str("entity_id", entity.ID). - Int("attempt", attempt). - Msg("Failed to prepare entity chunks") - } - - if err != nil { - log.Error().Err(err).Str("worker", workerName).Str("entity_id", entity.ID).Msg("Giving up on entity after max retries") - continue + Int("attempt", attempt+1). + Dur("retry_delay", delay). + Msg("Failed to prepare entity chunks, retrying...") + time.Sleep(delay) } _ = ragRepo.DeleteBySourceIDs(ctx, "entity", []string{entity.ID}) @@ -253,9 +235,11 @@ func main() { var wg sync.WaitGroup for i := 1; i <= workerCount; i++ { - wg.Go(func() { - runSingleWorker(ctx, rdb, i, ragRepo, ragUtils) - }) + wg.Add(1) + go func(workerID int) { + defer wg.Done() + runSingleWorker(ctx, rdb, workerID, ragRepo, ragUtils) + }(i) } wg.Wait() diff --git a/db/migrations/0000013_rag_documents.up.sql b/db/migrations/0000013_rag_documents.up.sql index e2f4ea1..62174f2 100644 --- a/db/migrations/0000013_rag_documents.up.sql +++ b/db/migrations/0000013_rag_documents.up.sql @@ -7,13 +7,14 @@ CREATE TABLE IF NOT EXISTS rag_chunks ( project_id UUID REFERENCES projects(id) ON DELETE CASCADE, chunk_index INT NOT NULL, content TEXT NOT NULL, - embedding vector(3072), + embedding vector(1536), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); CREATE INDEX idx_rag_chunks_source ON rag_chunks(source_type, source_id); CREATE INDEX idx_rag_chunks_project ON rag_chunks(project_id); +CREATE INDEX idx_rag_chunks_embedding_hnsw ON rag_chunks USING hnsw (embedding vector_cosine_ops); CREATE TRIGGER trigger_rag_chunks_updated_at BEFORE UPDATE ON rag_chunks diff --git a/pkg/ai/rag.go b/pkg/ai/rag.go index c2476dd..bab19e6 100644 --- a/pkg/ai/rag.go +++ b/pkg/ai/rag.go @@ -10,7 +10,7 @@ import ( "github.com/tmc/langchaingo/embeddings" "github.com/tmc/langchaingo/llms" - "github.com/tmc/langchaingo/llms/googleai" + "github.com/tmc/langchaingo/llms/openai" "github.com/tmc/langchaingo/textsplitter" ) @@ -20,28 +20,29 @@ type RagUtils struct { } func NewRagUtils() (*RagUtils, error) { - googleAIApiKey, err := config.GetConfig("GOOGLE_AI_API_KEY") + openRouterAPIKey, err := config.GetConfig("OPEN_ROUTER_API") if err != nil { return nil, err } - googleModal, err := config.GetConfig("GOOGLE_AI_MODEL") + model, err := config.GetConfig("OPEN_ROUTER_MODEL") if err != nil { - googleModal = "gemma-4-26b-a4b-it" + model = "qwen/qwen3.5-flash-02-23" } - googleEmbeddingModel, err := config.GetConfig("GOOGLE_AI_EMBEDDING_MODEL") + embeddingModel, err := config.GetConfig("OPEN_ROUTER_EMBEDDING_MODEL") if err != nil { - googleEmbeddingModel = "gemini-embedding-001" + embeddingModel = "qwen/qwen3-embedding-8b" } - llm, err := googleai.New(context.Background(), - googleai.WithAPIKey(googleAIApiKey), - googleai.WithDefaultModel(googleModal), - googleai.WithDefaultEmbeddingModel(googleEmbeddingModel), + llm, err := openai.New( + openai.WithToken(openRouterAPIKey), + openai.WithBaseURL("https://openrouter.ai/api/v1"), + openai.WithModel(model), + openai.WithEmbeddingModel(embeddingModel), ) if err != nil { - return nil, fmt.Errorf("failed to init google ai: %w", err) + return nil, fmt.Errorf("failed to init openrouter ai: %w", err) } embedder, err := embeddings.NewEmbedder(llm) @@ -77,6 +78,13 @@ func (u *RagUtils) PrepareChunks(ctx context.Context, text string) ([]string, [] return nil, nil, err } + // Truncate to 1536 dimensions for pgvector compatibility (HNSW index limit is 2000) + for i := range vectors { + if len(vectors[i]) > 1536 { + vectors[i] = vectors[i][:1536] + } + } + return chunks, vectors, nil } @@ -85,7 +93,13 @@ func (u *RagUtils) EmbedQuery(ctx context.Context, query string) ([]float32, err if err != nil || len(vectors) == 0 { return nil, err } - return vectors[0], nil + + vector := vectors[0] + if len(vector) > 1536 { + vector = vector[:1536] + } + + return vector, nil } func (u *RagUtils) GenerateResponse(ctx context.Context, prompt string) (string, error) {