diff --git a/pkg/state/state_postgres.go b/pkg/state/state_postgres.go index 28f2678..9deadf7 100644 --- a/pkg/state/state_postgres.go +++ b/pkg/state/state_postgres.go @@ -104,6 +104,30 @@ func SetupSchemas(pool *pgxpool.Pool) { CREATE SEQUENCE IF NOT EXISTS session_id_seq; `, }, + { + name: "job_repo_id_sequences", + sql: ` + -- Create a function to get or create a sequence for each session + CREATE OR REPLACE FUNCTION get_next_job_repo_id(session_id_param INTEGER) + RETURNS INTEGER AS $$ + DECLARE + seq_name TEXT; + next_id INTEGER; + BEGIN + -- Generate sequence name based on session_id + seq_name := 'job_repo_id_seq_' || session_id_param; + + -- Create sequence if it doesn't exist + EXECUTE format('CREATE SEQUENCE IF NOT EXISTS %I START 0', seq_name); + + -- Get next value from the sequence + EXECUTE format('SELECT nextval(%L)', seq_name) INTO next_id; + + RETURN next_id; + END; + $$ LANGUAGE plpgsql; + `, + }, { name: "analyze_results", sql: ` @@ -301,14 +325,22 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { ctx := context.Background() js := job.Spec - // 1. store AnalyzeJob payload ------------------------------- + // Begin transaction for atomic operation + tx, err := s.pool.Begin(ctx) + if err != nil { + slog.Error("AddJob: failed to begin transaction", "job", js, "error", err) + panic("AddJob(): " + err.Error()) + } + defer tx.Rollback(ctx) // Will be ignored if tx.Commit() succeeds + + // 1. Store AnalyzeJob payload ------------------------------- jb, err := json.Marshal(job) if err != nil { slog.Error("AddJob: marshal failed", "job", js, "error", err) panic("AddJob(): " + err.Error()) } - _, err = s.pool.Exec(ctx, ` + _, err = tx.Exec(ctx, ` INSERT INTO analyze_jobs (session_id, owner, repo, payload) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING @@ -318,18 +350,18 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { panic("AddJob(): " + err.Error()) } - // 2. determine next job_repo_id for this session ------------ + // 2. Get next job_repo_id atomically using sequence -------- var nextID int - err = s.pool.QueryRow(ctx, ` - SELECT COUNT(*) FROM job_repo_map WHERE session_id = $1 + err = tx.QueryRow(ctx, ` + SELECT get_next_job_repo_id($1) `, js.SessionID).Scan(&nextID) if err != nil { - slog.Error("AddJob: lookup next job_repo_id failed", "session", js.SessionID, "error", err) + slog.Error("AddJob: get_next_job_repo_id failed", "session", js.SessionID, "error", err) panic("AddJob(): " + err.Error()) } - // 3. save mapping (ignore duplicate rows) ------------------- - _, err = s.pool.Exec(ctx, ` + // 3. Save mapping with the atomically generated ID --------- + _, err = tx.Exec(ctx, ` INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING @@ -340,6 +372,12 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { panic("AddJob(): " + err.Error()) } + // Commit the transaction + if err = tx.Commit(ctx); err != nil { + slog.Error("AddJob: failed to commit transaction", "job", js, "error", err) + panic("AddJob(): " + err.Error()) + } + slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo) }