Fix: AddJob was completely wrong. The job_repo_id is a bijection; updated all code

This commit is contained in:
Michael Hohn
2025-08-04 13:30:43 -07:00
committed by =Michael Hohn
parent 1377d4cec9
commit 807d5f3d45
4 changed files with 115 additions and 108 deletions

View File

@@ -179,20 +179,7 @@ func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common.
// The jobRepoId from the range should now match the stored job_repo_id
// due to the ORDER BY job_repo_id in GetJobList()
for jobRepoId, job := range jobs {
// Validate that the slice index matches the expected repo ID
// This helps catch ordering issues between different state implementations
if expectedJobSpec, err := c.v.State.GetJobSpecByRepoId(js.SessionID, jobRepoId); err != nil {
slog.Warn("Job repo ID validation failed",
"sessionId", js.SessionID,
"jobRepoId", jobRepoId,
"error", err)
} else if expectedJobSpec.Owner != job.Spec.Owner || expectedJobSpec.Repo != job.Spec.Repo {
slog.Error("Job repo ID mismatch detected",
"sessionId", js.SessionID,
"jobRepoId", jobRepoId,
"expected", expectedJobSpec.NameWithOwner,
"actual", job.Spec.NameWithOwner)
}
// Get the job status
status, err := c.v.State.GetStatus(job.Spec)
if err != nil {

View File

@@ -3,12 +3,14 @@ package state
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"github.com/hohn/mrvacommander/pkg/common"
"github.com/hohn/mrvacommander/pkg/queue"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
@@ -88,46 +90,20 @@ func SetupSchemas(pool *pgxpool.Pool) {
{
name: "job_repo_map",
sql: `
CREATE TABLE IF NOT EXISTS job_repo_map (
session_id INTEGER NOT NULL,
job_repo_id INTEGER NOT NULL,
owner TEXT NOT NULL,
repo TEXT NOT NULL,
PRIMARY KEY (session_id, job_repo_id)
);
CREATE TABLE IF NOT EXISTS job_repo_map (
job_repo_id SERIAL PRIMARY KEY,
owner TEXT NOT NULL,
repo TEXT NOT NULL,
UNIQUE(owner, repo)
);
`,
},
{
name: "session_id_seq",
sql: `
CREATE SEQUENCE IF NOT EXISTS session_id_seq;
`,
},
{
name: "job_repo_id_sequences",
sql: `
-- Create a function to get or create a sequence for each session
CREATE OR REPLACE FUNCTION get_next_job_repo_id(session_id_param INTEGER)
RETURNS INTEGER AS $$
DECLARE
seq_name TEXT;
next_id INTEGER;
BEGIN
-- Generate sequence name based on session_id
seq_name := 'job_repo_id_seq_' || session_id_param;
-- Create sequence if it doesn't exist
EXECUTE format('CREATE SEQUENCE IF NOT EXISTS %I START 1', seq_name);
-- Get next value from the sequence
EXECUTE format('SELECT nextval(%L)', seq_name) INTO next_id;
RETURN next_id;
END;
$$ LANGUAGE plpgsql;
`,
},
{
name: "analyze_results",
sql: `
@@ -321,6 +297,91 @@ func (s *PGState) GetStatus(js common.JobSpec) (common.Status, error) {
return common.Status(status), nil
}
func (s *PGState) BijectJobRepo(
ctx context.Context,
owner *string,
repo *string,
jobRepoID *int,
) error {
switch {
case owner != nil && repo != nil && jobRepoID != nil:
// Try reverse lookup
var gotOwner, gotRepo string
err := s.pool.QueryRow(ctx,
`SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1`,
*jobRepoID).Scan(&gotOwner, &gotRepo)
if err == nil {
// Entry exists — check consistency
if gotOwner != *owner || gotRepo != *repo {
slog.Error("BijectJobRepo: ID-to-(owner,repo) mismatch",
"job_repo_id", *jobRepoID,
"expected_owner", gotOwner,
"expected_repo", gotRepo,
"given_owner", *owner,
"given_repo", *repo)
return fmt.Errorf("BijectJobRepo mismatch: (%s, %s) ≠ (%s, %s)", *owner, *repo, gotOwner, gotRepo)
}
return nil
}
// If not found, insert (owner, repo) and get correct job_repo_id
if errors.Is(err, pgx.ErrNoRows) {
slog.Info("BijectJobRepo: entry missing, inserting new mapping",
"owner", *owner, "repo", *repo)
return s.pool.QueryRow(ctx, `
INSERT INTO job_repo_map (owner, repo)
VALUES ($1, $2)
ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner
RETURNING job_repo_id
`, *owner, *repo).Scan(jobRepoID)
}
// Some other DB error
slog.Error("BijectJobRepo: reverse lookup failed",
"job_repo_id", *jobRepoID,
"error", err)
return fmt.Errorf("reverse lookup failed: %w", err)
case owner != nil && repo != nil:
// Forward insert or lookup
err := s.pool.QueryRow(ctx, `
INSERT INTO job_repo_map (owner, repo)
VALUES ($1, $2)
ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner
RETURNING job_repo_id
`, *owner, *repo).Scan(jobRepoID)
if err != nil {
slog.Error("BijectJobRepo: insert/lookup failed",
"owner", *owner,
"repo", *repo,
"error", err)
return err
}
return nil
case jobRepoID != nil:
// Reverse lookup
err := s.pool.QueryRow(ctx, `
SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1
`, *jobRepoID).Scan(owner, repo)
if err != nil {
slog.Error("BijectJobRepo: reverse lookup failed",
"job_repo_id", *jobRepoID,
"error", err)
return err
}
return nil
default:
err := fmt.Errorf("BijectJobRepo: at least one of (owner, repo) or jobRepoID must be non-nil")
slog.Error("BijectJobRepo: bad call", "error", err)
return err
}
}
func (s *PGState) AddJob(job queue.AnalyzeJob) {
ctx := context.Background()
js := job.Spec
@@ -350,26 +411,12 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
panic("AddJob(): " + err.Error())
}
// 2. Get next job_repo_id atomically using sequence --------
var nextID int
err = tx.QueryRow(ctx, `
SELECT get_next_job_repo_id($1)
`, js.SessionID).Scan(&nextID)
// 2. Get job_repo_id
var jobRepoID int
err = s.BijectJobRepo(ctx, &js.Owner, &js.Repo, &jobRepoID)
if err != nil {
slog.Error("AddJob: get_next_job_repo_id failed", "session", js.SessionID, "error", err) /* XX: hit here */
panic("AddJob(): " + err.Error())
}
// 3. Save mapping with the atomically generated ID ---------
_, err = tx.Exec(ctx, `
INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
`, js.SessionID, nextID, js.Owner, js.Repo)
if err != nil {
slog.Error("AddJob: insert job_repo_map failed",
"session", js.SessionID, "jobRepoId", nextID, "error", err)
panic("AddJob(): " + err.Error())
slog.Error("BijectJobRepo failed", "jobspec", js, "error", err)
panic("BijectJobRepo: " + err.Error())
}
// Commit the transaction
@@ -378,20 +425,19 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
panic("AddJob(): " + err.Error())
}
slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo)
slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", jobRepoID, "owner", js.Owner, "repo", js.Repo)
}
func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) {
ctx := context.Background()
rows, err := s.pool.Query(ctx, `
SELECT aj.payload FROM analyze_jobs aj
JOIN job_repo_map jrm ON aj.session_id = jrm.session_id
AND aj.owner = jrm.owner AND aj.repo = jrm.repo
WHERE aj.session_id = $1
ORDER BY jrm.job_repo_id
SELECT payload FROM analyze_jobs
WHERE session_id = $1
ORDER BY owner, repo
`, sessionId)
if err != nil {
slog.Error("GetJobList: query failed", "session_id", sessionId, "error", err)
return nil, err
}
defer rows.Close()
@@ -400,16 +446,23 @@ func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) {
for rows.Next() {
var jsonBytes []byte
if err := rows.Scan(&jsonBytes); err != nil {
slog.Error("GetJobList: scan failed", "error", err)
return nil, err
}
var job queue.AnalyzeJob
if err := json.Unmarshal(jsonBytes, &job); err != nil {
slog.Error("GetJobList: unmarshal failed", "error", err)
return nil, err
}
jobs = append(jobs, job)
}
return jobs, rows.Err()
if err := rows.Err(); err != nil {
slog.Error("GetJobList: rows iteration failed", "error", err)
return nil, err
}
return jobs, nil
}
func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) {
@@ -419,8 +472,8 @@ func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec,
err := s.pool.QueryRow(ctx, `
SELECT owner, repo
FROM job_repo_map
WHERE session_id = $1 AND job_repo_id = $2
`, sessionId, jobRepoId).Scan(&owner, &repo)
WHERE job_repo_id = $1
`, jobRepoId).Scan(&owner, &repo)
if err != nil {
return common.JobSpec{}, err
}