fix: use postgres to generate gap/concurrency/race-safe job_repo_it
This takes the place of the mutex used by the in-memory version
This commit is contained in:
@@ -104,6 +104,30 @@ func SetupSchemas(pool *pgxpool.Pool) {
|
||||
CREATE SEQUENCE IF NOT EXISTS session_id_seq;
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "job_repo_id_sequences",
|
||||
sql: `
|
||||
-- Create a function to get or create a sequence for each session
|
||||
CREATE OR REPLACE FUNCTION get_next_job_repo_id(session_id_param INTEGER)
|
||||
RETURNS INTEGER AS $$
|
||||
DECLARE
|
||||
seq_name TEXT;
|
||||
next_id INTEGER;
|
||||
BEGIN
|
||||
-- Generate sequence name based on session_id
|
||||
seq_name := 'job_repo_id_seq_' || session_id_param;
|
||||
|
||||
-- Create sequence if it doesn't exist
|
||||
EXECUTE format('CREATE SEQUENCE IF NOT EXISTS %I START 0', seq_name);
|
||||
|
||||
-- Get next value from the sequence
|
||||
EXECUTE format('SELECT nextval(%L)', seq_name) INTO next_id;
|
||||
|
||||
RETURN next_id;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
`,
|
||||
},
|
||||
{
|
||||
name: "analyze_results",
|
||||
sql: `
|
||||
@@ -301,14 +325,22 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
||||
ctx := context.Background()
|
||||
js := job.Spec
|
||||
|
||||
// 1. store AnalyzeJob payload -------------------------------
|
||||
// Begin transaction for atomic operation
|
||||
tx, err := s.pool.Begin(ctx)
|
||||
if err != nil {
|
||||
slog.Error("AddJob: failed to begin transaction", "job", js, "error", err)
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
defer tx.Rollback(ctx) // Will be ignored if tx.Commit() succeeds
|
||||
|
||||
// 1. Store AnalyzeJob payload -------------------------------
|
||||
jb, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
slog.Error("AddJob: marshal failed", "job", js, "error", err)
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
|
||||
_, err = s.pool.Exec(ctx, `
|
||||
_, err = tx.Exec(ctx, `
|
||||
INSERT INTO analyze_jobs (session_id, owner, repo, payload)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT DO NOTHING
|
||||
@@ -318,18 +350,18 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
|
||||
// 2. determine next job_repo_id for this session ------------
|
||||
// 2. Get next job_repo_id atomically using sequence --------
|
||||
var nextID int
|
||||
err = s.pool.QueryRow(ctx, `
|
||||
SELECT COUNT(*) FROM job_repo_map WHERE session_id = $1
|
||||
err = tx.QueryRow(ctx, `
|
||||
SELECT get_next_job_repo_id($1)
|
||||
`, js.SessionID).Scan(&nextID)
|
||||
if err != nil {
|
||||
slog.Error("AddJob: lookup next job_repo_id failed", "session", js.SessionID, "error", err)
|
||||
slog.Error("AddJob: get_next_job_repo_id failed", "session", js.SessionID, "error", err)
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
|
||||
// 3. save mapping (ignore duplicate rows) -------------------
|
||||
_, err = s.pool.Exec(ctx, `
|
||||
// 3. Save mapping with the atomically generated ID ---------
|
||||
_, err = tx.Exec(ctx, `
|
||||
INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
ON CONFLICT DO NOTHING
|
||||
@@ -340,6 +372,12 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
if err = tx.Commit(ctx); err != nil {
|
||||
slog.Error("AddJob: failed to commit transaction", "job", js, "error", err)
|
||||
panic("AddJob(): " + err.Error())
|
||||
}
|
||||
|
||||
slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user