From f920a799d3d64894100d89fbc405290297b6dae6 Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Fri, 11 Jul 2025 15:24:48 -0700 Subject: [PATCH] wip: postgres state: update SetupSchemas, add GetJobSpecByRepoId --- pkg/state/state_postgres.go | 68 ++++++++++++++++++++++++++++++++++--- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/pkg/state/state_postgres.go b/pkg/state/state_postgres.go index bac75e1..19da467 100644 --- a/pkg/state/state_postgres.go +++ b/pkg/state/state_postgres.go @@ -59,6 +59,19 @@ func SetupSchemas(pool *pgxpool.Pool) { name string sql string }{ + { + name: "job_repo_map", + sql: ` + CREATE TABLE IF NOT EXISTS job_repo_map ( + session_id INTEGER NOT NULL, + job_repo_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + PRIMARY KEY (session_id, job_repo_id) + ); + `, + }, + { name: "session_id_seq", sql: ` @@ -246,7 +259,8 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { ctx := context.Background() js := job.Spec - jobJSON, err := json.Marshal(job) + // 1. store AnalyzeJob payload ------------------------------- + jb, err := json.Marshal(job) if err != nil { slog.Error("AddJob: marshal failed", "job", js, "error", err) panic("AddJob(): " + err.Error()) @@ -256,12 +270,37 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { INSERT INTO analyze_jobs (session_id, owner, repo, payload) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING - `, js.SessionID, js.Owner, js.Repo, jobJSON) - + `, js.SessionID, js.Owner, js.Repo, jb) if err != nil { - slog.Error("AddJob: insert failed", "job", js, "error", err) + slog.Error("AddJob: insert analyze_jobs failed", "job", js, "error", err) panic("AddJob(): " + err.Error()) } + + // 2. determine next job_repo_id for this session ------------ + var nextID int + err = s.pool.QueryRow(ctx, ` + SELECT COALESCE(MAX(job_repo_id)+1, 0) + FROM job_repo_map + WHERE session_id = $1 + `, js.SessionID).Scan(&nextID) + if err != nil { + slog.Error("AddJob: lookup next job_repo_id failed", "session", js.SessionID, "error", err) + panic("AddJob(): " + err.Error()) + } + + // 3. save mapping (ignore duplicate rows) ------------------- + _, err = s.pool.Exec(ctx, ` + INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + `, js.SessionID, nextID, js.Owner, js.Repo) + if err != nil { + slog.Error("AddJob: insert job_repo_map failed", + "session", js.SessionID, "jobRepoId", nextID, "error", err) + panic("AddJob(): " + err.Error()) + } + + slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo) } func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) { @@ -291,3 +330,24 @@ func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) { return jobs, rows.Err() } + +func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) { + ctx := context.Background() + + var owner, repo string + err := s.pool.QueryRow(ctx, ` + SELECT owner, repo + FROM job_repo_map + WHERE session_id = $1 AND job_repo_id = $2 + `, sessionId, jobRepoId).Scan(&owner, &repo) + if err != nil { + return common.JobSpec{}, err + } + return common.JobSpec{ + SessionID: sessionId, + NameWithOwner: common.NameWithOwner{ + Owner: owner, + Repo: repo, + }, + }, nil +}