Fix: get the correct repo_id from postgres, use the mapping in responses
This commit is contained in:
committed by
=Michael Hohn
parent
807d5f3d45
commit
750187fb12
@@ -176,9 +176,8 @@ func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Loop through all jobs under the same session id
|
// Loop through all jobs under the same session id
|
||||||
// The jobRepoId from the range should now match the stored job_repo_id
|
// fix
|
||||||
// due to the ORDER BY job_repo_id in GetJobList()
|
for _, job := range jobs {
|
||||||
for jobRepoId, job := range jobs {
|
|
||||||
|
|
||||||
// Get the job status
|
// Get the job status
|
||||||
status, err := c.v.State.GetStatus(job.Spec)
|
status, err := c.v.State.GetStatus(job.Spec)
|
||||||
@@ -211,6 +210,8 @@ func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common.
|
|||||||
}
|
}
|
||||||
resultCount = jobResult.ResultCount
|
resultCount = jobResult.ResultCount
|
||||||
}
|
}
|
||||||
|
// Get jobRepoID from (owner,repo)
|
||||||
|
jobRepoId := c.v.State.GetRepoId(job.Spec.NameWithOwner)
|
||||||
|
|
||||||
// Append all scanned (complete and incomplete) repos to the response
|
// Append all scanned (complete and incomplete) repos to the response
|
||||||
scannedRepos = append(scannedRepos,
|
scannedRepos = append(scannedRepos,
|
||||||
|
|||||||
@@ -18,6 +18,9 @@ type ServerState interface {
|
|||||||
// TODO: fix this hacky logic
|
// TODO: fix this hacky logic
|
||||||
GetJobSpecByRepoId(sessionId int, jobRepoId int) (common.JobSpec, error)
|
GetJobSpecByRepoId(sessionId int, jobRepoId int) (common.JobSpec, error)
|
||||||
|
|
||||||
|
// The repo id is uniquely determined by NameWithOwner
|
||||||
|
GetRepoId(owner common.NameWithOwner) int
|
||||||
|
|
||||||
// SetResult stores the analysis result for the specified session ID and repository.
|
// SetResult stores the analysis result for the specified session ID and repository.
|
||||||
SetResult(js common.JobSpec, ar queue.AnalyzeResult)
|
SetResult(js common.JobSpec, ar queue.AnalyzeResult)
|
||||||
|
|
||||||
|
|||||||
@@ -3,14 +3,12 @@ package state
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"github.com/hohn/mrvacommander/pkg/common"
|
"github.com/hohn/mrvacommander/pkg/common"
|
||||||
"github.com/hohn/mrvacommander/pkg/queue"
|
"github.com/hohn/mrvacommander/pkg/queue"
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -297,89 +295,26 @@ func (s *PGState) GetStatus(js common.JobSpec) (common.Status, error) {
|
|||||||
return common.Status(status), nil
|
return common.Status(status), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PGState) BijectJobRepo(
|
// GetRepoId returns a stable unique ID for a given (owner, repo).
|
||||||
ctx context.Context,
|
// If the pair doesn't exist, it is inserted atomically.
|
||||||
owner *string,
|
func (s *PGState) GetRepoId(cno common.NameWithOwner) int {
|
||||||
repo *string,
|
ctx := context.Background()
|
||||||
jobRepoID *int,
|
|
||||||
) error {
|
|
||||||
switch {
|
|
||||||
|
|
||||||
case owner != nil && repo != nil && jobRepoID != nil:
|
var jobRepoID int
|
||||||
// Try reverse lookup
|
|
||||||
var gotOwner, gotRepo string
|
|
||||||
err := s.pool.QueryRow(ctx,
|
|
||||||
`SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1`,
|
|
||||||
*jobRepoID).Scan(&gotOwner, &gotRepo)
|
|
||||||
|
|
||||||
if err == nil {
|
err := s.pool.QueryRow(ctx, `
|
||||||
// Entry exists — check consistency
|
|
||||||
if gotOwner != *owner || gotRepo != *repo {
|
|
||||||
slog.Error("BijectJobRepo: ID-to-(owner,repo) mismatch",
|
|
||||||
"job_repo_id", *jobRepoID,
|
|
||||||
"expected_owner", gotOwner,
|
|
||||||
"expected_repo", gotRepo,
|
|
||||||
"given_owner", *owner,
|
|
||||||
"given_repo", *repo)
|
|
||||||
return fmt.Errorf("BijectJobRepo mismatch: (%s, %s) ≠ (%s, %s)", *owner, *repo, gotOwner, gotRepo)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If not found, insert (owner, repo) and get correct job_repo_id
|
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
|
||||||
slog.Info("BijectJobRepo: entry missing, inserting new mapping",
|
|
||||||
"owner", *owner, "repo", *repo)
|
|
||||||
|
|
||||||
return s.pool.QueryRow(ctx, `
|
|
||||||
INSERT INTO job_repo_map (owner, repo)
|
INSERT INTO job_repo_map (owner, repo)
|
||||||
VALUES ($1, $2)
|
VALUES ($1, $2)
|
||||||
ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner
|
ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner
|
||||||
RETURNING job_repo_id
|
RETURNING job_repo_id
|
||||||
`, *owner, *repo).Scan(jobRepoID)
|
`, cno.Owner, cno.Repo).Scan(&jobRepoID)
|
||||||
}
|
if err != nil {
|
||||||
|
slog.Error("GetRepoId failed", "NameWithOwner", cno, "error", err)
|
||||||
// Some other DB error
|
panic("GetRepoId: " + err.Error())
|
||||||
slog.Error("BijectJobRepo: reverse lookup failed",
|
|
||||||
"job_repo_id", *jobRepoID,
|
|
||||||
"error", err)
|
|
||||||
return fmt.Errorf("reverse lookup failed: %w", err)
|
|
||||||
|
|
||||||
case owner != nil && repo != nil:
|
|
||||||
// Forward insert or lookup
|
|
||||||
err := s.pool.QueryRow(ctx, `
|
|
||||||
INSERT INTO job_repo_map (owner, repo)
|
|
||||||
VALUES ($1, $2)
|
|
||||||
ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner
|
|
||||||
RETURNING job_repo_id
|
|
||||||
`, *owner, *repo).Scan(jobRepoID)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("BijectJobRepo: insert/lookup failed",
|
|
||||||
"owner", *owner,
|
|
||||||
"repo", *repo,
|
|
||||||
"error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
|
|
||||||
case jobRepoID != nil:
|
|
||||||
// Reverse lookup
|
|
||||||
err := s.pool.QueryRow(ctx, `
|
|
||||||
SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1
|
|
||||||
`, *jobRepoID).Scan(owner, repo)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("BijectJobRepo: reverse lookup failed",
|
|
||||||
"job_repo_id", *jobRepoID,
|
|
||||||
"error", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
|
|
||||||
default:
|
|
||||||
err := fmt.Errorf("BijectJobRepo: at least one of (owner, repo) or jobRepoID must be non-nil")
|
|
||||||
slog.Error("BijectJobRepo: bad call", "error", err)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return jobRepoID
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
||||||
@@ -412,12 +347,7 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 2. Get job_repo_id
|
// 2. Get job_repo_id
|
||||||
var jobRepoID int
|
jobRepoID := s.GetRepoId(job.Spec.NameWithOwner)
|
||||||
err = s.BijectJobRepo(ctx, &js.Owner, &js.Repo, &jobRepoID)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("BijectJobRepo failed", "jobspec", js, "error", err)
|
|
||||||
panic("BijectJobRepo: " + err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Commit the transaction
|
// Commit the transaction
|
||||||
if err = tx.Commit(ctx); err != nil {
|
if err = tx.Commit(ctx); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user