wip: postgres state: update SetupSchemas, add GetJobSpecByRepoId
This commit is contained in:
@@ -59,6 +59,19 @@ func SetupSchemas(pool *pgxpool.Pool) {
|
|||||||
name string
|
name string
|
||||||
sql string
|
sql string
|
||||||
}{
|
}{
|
||||||
|
{
|
||||||
|
name: "job_repo_map",
|
||||||
|
sql: `
|
||||||
|
CREATE TABLE IF NOT EXISTS job_repo_map (
|
||||||
|
session_id INTEGER NOT NULL,
|
||||||
|
job_repo_id INTEGER NOT NULL,
|
||||||
|
owner TEXT NOT NULL,
|
||||||
|
repo TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (session_id, job_repo_id)
|
||||||
|
);
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
name: "session_id_seq",
|
name: "session_id_seq",
|
||||||
sql: `
|
sql: `
|
||||||
@@ -246,7 +259,8 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
js := job.Spec
|
js := job.Spec
|
||||||
|
|
||||||
jobJSON, err := json.Marshal(job)
|
// 1. store AnalyzeJob payload -------------------------------
|
||||||
|
jb, err := json.Marshal(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("AddJob: marshal failed", "job", js, "error", err)
|
slog.Error("AddJob: marshal failed", "job", js, "error", err)
|
||||||
panic("AddJob(): " + err.Error())
|
panic("AddJob(): " + err.Error())
|
||||||
@@ -256,12 +270,37 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
|||||||
INSERT INTO analyze_jobs (session_id, owner, repo, payload)
|
INSERT INTO analyze_jobs (session_id, owner, repo, payload)
|
||||||
VALUES ($1, $2, $3, $4)
|
VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT DO NOTHING
|
ON CONFLICT DO NOTHING
|
||||||
`, js.SessionID, js.Owner, js.Repo, jobJSON)
|
`, js.SessionID, js.Owner, js.Repo, jb)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("AddJob: insert failed", "job", js, "error", err)
|
slog.Error("AddJob: insert analyze_jobs failed", "job", js, "error", err)
|
||||||
panic("AddJob(): " + err.Error())
|
panic("AddJob(): " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2. determine next job_repo_id for this session ------------
|
||||||
|
var nextID int
|
||||||
|
err = s.pool.QueryRow(ctx, `
|
||||||
|
SELECT COALESCE(MAX(job_repo_id)+1, 0)
|
||||||
|
FROM job_repo_map
|
||||||
|
WHERE session_id = $1
|
||||||
|
`, js.SessionID).Scan(&nextID)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("AddJob: lookup next job_repo_id failed", "session", js.SessionID, "error", err)
|
||||||
|
panic("AddJob(): " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. save mapping (ignore duplicate rows) -------------------
|
||||||
|
_, err = s.pool.Exec(ctx, `
|
||||||
|
INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo)
|
||||||
|
VALUES ($1, $2, $3, $4)
|
||||||
|
ON CONFLICT DO NOTHING
|
||||||
|
`, js.SessionID, nextID, js.Owner, js.Repo)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("AddJob: insert job_repo_map failed",
|
||||||
|
"session", js.SessionID, "jobRepoId", nextID, "error", err)
|
||||||
|
panic("AddJob(): " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) {
|
func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) {
|
||||||
@@ -291,3 +330,24 @@ func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) {
|
|||||||
|
|
||||||
return jobs, rows.Err()
|
return jobs, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
var owner, repo string
|
||||||
|
err := s.pool.QueryRow(ctx, `
|
||||||
|
SELECT owner, repo
|
||||||
|
FROM job_repo_map
|
||||||
|
WHERE session_id = $1 AND job_repo_id = $2
|
||||||
|
`, sessionId, jobRepoId).Scan(&owner, &repo)
|
||||||
|
if err != nil {
|
||||||
|
return common.JobSpec{}, err
|
||||||
|
}
|
||||||
|
return common.JobSpec{
|
||||||
|
SessionID: sessionId,
|
||||||
|
NameWithOwner: common.NameWithOwner{
|
||||||
|
Owner: owner,
|
||||||
|
Repo: repo,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user