feat: implement RAG indexing worker with vector database schema and document embedding support
Build and Release / release (push) Successful in 1m24s
Build and Release / release (push) Successful in 1m24s
This commit is contained in:
+18
-5
@@ -19,13 +19,26 @@ import (
|
|||||||
|
|
||||||
func runStatistics(ctx context.Context, repo repositories.StatisticRepository) {
|
func runStatistics(ctx context.Context, repo repositories.StatisticRepository) {
|
||||||
log.Info().Msg("Running daily statistics...")
|
log.Info().Msg("Running daily statistics...")
|
||||||
today := time.Now().Truncate(24 * time.Hour)
|
|
||||||
_, err := repo.Upsert(ctx, today)
|
loc, err := time.LoadLocation("Asia/Ho_Chi_Minh")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("Failed to upsert system statistics")
|
log.Warn().Err(err).Msg("Failed to load Asia/Ho_Chi_Minh timezone, falling back to fixed UTC+7")
|
||||||
} else {
|
loc = time.FixedZone("ICT", 7*3600)
|
||||||
log.Info().Msg("Successfully updated daily statistics and cleared cache")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
now := time.Now().In(loc)
|
||||||
|
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
// Upsert stats for today, yesterday, and the day before to prevent timezone gaps/delays
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
date := today.AddDate(0, 0, -i)
|
||||||
|
log.Info().Str("date", date.Format("2006-01-02")).Msg("Upserting system statistics")
|
||||||
|
_, err = repo.Upsert(ctx, date)
|
||||||
|
if err != nil {
|
||||||
|
log.Error().Err(err).Str("date", date.Format("2006-01-02")).Msg("Failed to upsert system statistics")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info().Msg("Successfully updated daily statistics and cleared cache")
|
||||||
}
|
}
|
||||||
|
|
||||||
func runBackup(ctx context.Context, s3 storage.Storage, dbURI string) {
|
func runBackup(ctx context.Context, s3 storage.Storage, dbURI string) {
|
||||||
|
|||||||
+25
-41
@@ -53,33 +53,24 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag
|
|||||||
var vectors [][]float32
|
var vectors [][]float32
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
for attempt := 0; ; attempt++ {
|
||||||
if attempt > 0 {
|
|
||||||
delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt-1)))
|
|
||||||
log.Warn().
|
|
||||||
Str("worker", workerName).
|
|
||||||
Str("wiki_id", wiki.ID).
|
|
||||||
Int("attempt", attempt).
|
|
||||||
Dur("delay", delay).
|
|
||||||
Msg("Retrying wiki embedding")
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText)
|
chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt)))
|
||||||
|
if delay > 2*time.Minute {
|
||||||
|
delay = 2 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
log.Error().Err(err).
|
log.Error().Err(err).
|
||||||
Str("worker", workerName).
|
Str("worker", workerName).
|
||||||
Str("wiki_id", wiki.ID).
|
Str("wiki_id", wiki.ID).
|
||||||
Int("attempt", attempt).
|
Int("attempt", attempt+1).
|
||||||
Msg("Failed to prepare wiki chunks")
|
Dur("retry_delay", delay).
|
||||||
}
|
Msg("Failed to prepare wiki chunks, retrying...")
|
||||||
|
time.Sleep(delay)
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Str("worker", workerName).Str("wiki_id", wiki.ID).Msg("Giving up on wiki after max retries")
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = ragRepo.DeleteBySourceIDs(ctx, "wiki", []string{wiki.ID})
|
_ = ragRepo.DeleteBySourceIDs(ctx, "wiki", []string{wiki.ID})
|
||||||
@@ -106,33 +97,24 @@ func processRagTask(ctx context.Context, ragRepo repositories.RagRepository, rag
|
|||||||
var vectors [][]float32
|
var vectors [][]float32
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
for attempt := 0; ; attempt++ {
|
||||||
if attempt > 0 {
|
|
||||||
delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt-1)))
|
|
||||||
log.Warn().
|
|
||||||
Str("worker", workerName).
|
|
||||||
Str("entity_id", entity.ID).
|
|
||||||
Int("attempt", attempt).
|
|
||||||
Dur("delay", delay).
|
|
||||||
Msg("Retrying entity embedding")
|
|
||||||
time.Sleep(delay)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText)
|
chunks, vectors, err = ragUtils.PrepareChunks(ctx, cleanText)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delay := baseRetryDelay * time.Duration(math.Pow(2, float64(attempt)))
|
||||||
|
if delay > 2*time.Minute {
|
||||||
|
delay = 2 * time.Minute
|
||||||
|
}
|
||||||
|
|
||||||
log.Error().Err(err).
|
log.Error().Err(err).
|
||||||
Str("worker", workerName).
|
Str("worker", workerName).
|
||||||
Str("entity_id", entity.ID).
|
Str("entity_id", entity.ID).
|
||||||
Int("attempt", attempt).
|
Int("attempt", attempt+1).
|
||||||
Msg("Failed to prepare entity chunks")
|
Dur("retry_delay", delay).
|
||||||
}
|
Msg("Failed to prepare entity chunks, retrying...")
|
||||||
|
time.Sleep(delay)
|
||||||
if err != nil {
|
|
||||||
log.Error().Err(err).Str("worker", workerName).Str("entity_id", entity.ID).Msg("Giving up on entity after max retries")
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = ragRepo.DeleteBySourceIDs(ctx, "entity", []string{entity.ID})
|
_ = ragRepo.DeleteBySourceIDs(ctx, "entity", []string{entity.ID})
|
||||||
@@ -253,9 +235,11 @@ func main() {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for i := 1; i <= workerCount; i++ {
|
for i := 1; i <= workerCount; i++ {
|
||||||
wg.Go(func() {
|
wg.Add(1)
|
||||||
runSingleWorker(ctx, rdb, i, ragRepo, ragUtils)
|
go func(workerID int) {
|
||||||
})
|
defer wg.Done()
|
||||||
|
runSingleWorker(ctx, rdb, workerID, ragRepo, ragUtils)
|
||||||
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|||||||
@@ -7,13 +7,14 @@ CREATE TABLE IF NOT EXISTS rag_chunks (
|
|||||||
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
|
project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
|
||||||
chunk_index INT NOT NULL,
|
chunk_index INT NOT NULL,
|
||||||
content TEXT NOT NULL,
|
content TEXT NOT NULL,
|
||||||
embedding vector(3072),
|
embedding vector(1536),
|
||||||
created_at TIMESTAMPTZ DEFAULT now(),
|
created_at TIMESTAMPTZ DEFAULT now(),
|
||||||
updated_at TIMESTAMPTZ DEFAULT now()
|
updated_at TIMESTAMPTZ DEFAULT now()
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE INDEX idx_rag_chunks_source ON rag_chunks(source_type, source_id);
|
CREATE INDEX idx_rag_chunks_source ON rag_chunks(source_type, source_id);
|
||||||
CREATE INDEX idx_rag_chunks_project ON rag_chunks(project_id);
|
CREATE INDEX idx_rag_chunks_project ON rag_chunks(project_id);
|
||||||
|
CREATE INDEX idx_rag_chunks_embedding_hnsw ON rag_chunks USING hnsw (embedding vector_cosine_ops);
|
||||||
|
|
||||||
CREATE TRIGGER trigger_rag_chunks_updated_at
|
CREATE TRIGGER trigger_rag_chunks_updated_at
|
||||||
BEFORE UPDATE ON rag_chunks
|
BEFORE UPDATE ON rag_chunks
|
||||||
|
|||||||
+26
-12
@@ -10,7 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/tmc/langchaingo/embeddings"
|
"github.com/tmc/langchaingo/embeddings"
|
||||||
"github.com/tmc/langchaingo/llms"
|
"github.com/tmc/langchaingo/llms"
|
||||||
"github.com/tmc/langchaingo/llms/googleai"
|
"github.com/tmc/langchaingo/llms/openai"
|
||||||
"github.com/tmc/langchaingo/textsplitter"
|
"github.com/tmc/langchaingo/textsplitter"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -20,28 +20,29 @@ type RagUtils struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewRagUtils() (*RagUtils, error) {
|
func NewRagUtils() (*RagUtils, error) {
|
||||||
googleAIApiKey, err := config.GetConfig("GOOGLE_AI_API_KEY")
|
openRouterAPIKey, err := config.GetConfig("OPEN_ROUTER_API")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
googleModal, err := config.GetConfig("GOOGLE_AI_MODEL")
|
model, err := config.GetConfig("OPEN_ROUTER_MODEL")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
googleModal = "gemma-4-26b-a4b-it"
|
model = "qwen/qwen3.5-flash-02-23"
|
||||||
}
|
}
|
||||||
|
|
||||||
googleEmbeddingModel, err := config.GetConfig("GOOGLE_AI_EMBEDDING_MODEL")
|
embeddingModel, err := config.GetConfig("OPEN_ROUTER_EMBEDDING_MODEL")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
googleEmbeddingModel = "gemini-embedding-001"
|
embeddingModel = "qwen/qwen3-embedding-8b"
|
||||||
}
|
}
|
||||||
|
|
||||||
llm, err := googleai.New(context.Background(),
|
llm, err := openai.New(
|
||||||
googleai.WithAPIKey(googleAIApiKey),
|
openai.WithToken(openRouterAPIKey),
|
||||||
googleai.WithDefaultModel(googleModal),
|
openai.WithBaseURL("https://openrouter.ai/api/v1"),
|
||||||
googleai.WithDefaultEmbeddingModel(googleEmbeddingModel),
|
openai.WithModel(model),
|
||||||
|
openai.WithEmbeddingModel(embeddingModel),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to init google ai: %w", err)
|
return nil, fmt.Errorf("failed to init openrouter ai: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
embedder, err := embeddings.NewEmbedder(llm)
|
embedder, err := embeddings.NewEmbedder(llm)
|
||||||
@@ -77,6 +78,13 @@ func (u *RagUtils) PrepareChunks(ctx context.Context, text string) ([]string, []
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Truncate to 1536 dimensions for pgvector compatibility (HNSW index limit is 2000)
|
||||||
|
for i := range vectors {
|
||||||
|
if len(vectors[i]) > 1536 {
|
||||||
|
vectors[i] = vectors[i][:1536]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return chunks, vectors, nil
|
return chunks, vectors, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -85,7 +93,13 @@ func (u *RagUtils) EmbedQuery(ctx context.Context, query string) ([]float32, err
|
|||||||
if err != nil || len(vectors) == 0 {
|
if err != nil || len(vectors) == 0 {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return vectors[0], nil
|
|
||||||
|
vector := vectors[0]
|
||||||
|
if len(vector) > 1536 {
|
||||||
|
vector = vector[:1536]
|
||||||
|
}
|
||||||
|
|
||||||
|
return vector, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *RagUtils) GenerateResponse(ctx context.Context, prompt string) (string, error) {
|
func (u *RagUtils) GenerateResponse(ctx context.Context, prompt string) (string, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user