diff --git a/go.mod b/go.mod index d200f65..dd8a2b6 100644 --- a/go.mod +++ b/go.mod @@ -4,33 +4,25 @@ go 1.22.0 require ( github.com/BurntSushi/toml v1.4.0 - github.com/elastic/go-sysinfo v1.14.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 + github.com/jackc/pgx/v5 v5.6.0 github.com/minio/minio-go/v7 v7.0.71 github.com/rabbitmq/amqp091-go v1.10.0 golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f gopkg.in/yaml.v3 v3.0.1 - gorm.io/driver/postgres v1.5.9 - gorm.io/gorm v1.25.10 ) require ( github.com/dustin/go-humanize v1.0.1 // indirect - github.com/elastic/go-windows v1.0.1 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.6.0 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/jinzhu/inflection v1.0.0 // indirect - github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.6 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/kr/text v0.2.0 // indirect github.com/minio/md5-simd v1.1.2 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/procfs v0.15.1 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/rs/xid v1.5.0 // indirect golang.org/x/crypto v0.24.0 // indirect @@ -39,5 +31,4 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect - howett.net/plist v1.0.1 // indirect ) diff --git a/go.sum b/go.sum index bff5962..61593ce 100644 --- a/go.sum +++ b/go.sum @@ -6,14 +6,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= -github.com/elastic/go-sysinfo v1.14.0 h1:dQRtiqLycoOOla7IflZg3aN213vqJmP0lpVpKQ9lUEY= -github.com/elastic/go-sysinfo v1.14.0/go.mod h1:FKUXnZWhnYI0ueO7jhsGV3uQJ5hiz8OqM5b3oGyaRr8= -github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= -github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -26,11 +20,6 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= -github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= -github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= -github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= -github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -44,13 +33,8 @@ github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.71 h1:No9XfOKTYi6i0GnBj+WZwD8WP5GZfL7n7GOjRqCdAjA= github.com/minio/minio-go/v7 v7.0.71/go.mod h1:4yBA8v80xGA30cfM3fz0DKYMXunWl/AV/6tWEs9ryzo= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= -github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -72,7 +56,6 @@ golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -83,13 +66,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= -gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= -gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= -gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= -howett.net/plist v1.0.1 h1:37GdZ8tP09Q35o9ych3ehygcsL+HqKSwzctveSlarvM= -howett.net/plist v1.0.1/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= diff --git a/pkg/server/server.go b/pkg/server/server.go index a6a24f4..df10e96 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -179,20 +179,7 @@ func (c *CommanderSingle) submitStatusResponse(w http.ResponseWriter, js common. // The jobRepoId from the range should now match the stored job_repo_id // due to the ORDER BY job_repo_id in GetJobList() for jobRepoId, job := range jobs { - // Validate that the slice index matches the expected repo ID - // This helps catch ordering issues between different state implementations - if expectedJobSpec, err := c.v.State.GetJobSpecByRepoId(js.SessionID, jobRepoId); err != nil { - slog.Warn("Job repo ID validation failed", - "sessionId", js.SessionID, - "jobRepoId", jobRepoId, - "error", err) - } else if expectedJobSpec.Owner != job.Spec.Owner || expectedJobSpec.Repo != job.Spec.Repo { - slog.Error("Job repo ID mismatch detected", - "sessionId", js.SessionID, - "jobRepoId", jobRepoId, - "expected", expectedJobSpec.NameWithOwner, - "actual", job.Spec.NameWithOwner) - } + // Get the job status status, err := c.v.State.GetStatus(job.Spec) if err != nil { diff --git a/pkg/state/state_postgres.go b/pkg/state/state_postgres.go index 19e5818..b3569fc 100644 --- a/pkg/state/state_postgres.go +++ b/pkg/state/state_postgres.go @@ -3,12 +3,14 @@ package state import ( "context" "encoding/json" + "errors" "fmt" "log/slog" "os" "github.com/hohn/mrvacommander/pkg/common" "github.com/hohn/mrvacommander/pkg/queue" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -88,46 +90,20 @@ func SetupSchemas(pool *pgxpool.Pool) { { name: "job_repo_map", sql: ` - CREATE TABLE IF NOT EXISTS job_repo_map ( - session_id INTEGER NOT NULL, - job_repo_id INTEGER NOT NULL, - owner TEXT NOT NULL, - repo TEXT NOT NULL, - PRIMARY KEY (session_id, job_repo_id) - ); + CREATE TABLE IF NOT EXISTS job_repo_map ( + job_repo_id SERIAL PRIMARY KEY, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + UNIQUE(owner, repo) + ); `, }, - { name: "session_id_seq", sql: ` CREATE SEQUENCE IF NOT EXISTS session_id_seq; `, }, - { - name: "job_repo_id_sequences", - sql: ` - -- Create a function to get or create a sequence for each session - CREATE OR REPLACE FUNCTION get_next_job_repo_id(session_id_param INTEGER) - RETURNS INTEGER AS $$ - DECLARE - seq_name TEXT; - next_id INTEGER; - BEGIN - -- Generate sequence name based on session_id - seq_name := 'job_repo_id_seq_' || session_id_param; - - -- Create sequence if it doesn't exist - EXECUTE format('CREATE SEQUENCE IF NOT EXISTS %I START 1', seq_name); - - -- Get next value from the sequence - EXECUTE format('SELECT nextval(%L)', seq_name) INTO next_id; - - RETURN next_id; - END; - $$ LANGUAGE plpgsql; - `, - }, { name: "analyze_results", sql: ` @@ -321,6 +297,91 @@ func (s *PGState) GetStatus(js common.JobSpec) (common.Status, error) { return common.Status(status), nil } +func (s *PGState) BijectJobRepo( + ctx context.Context, + owner *string, + repo *string, + jobRepoID *int, +) error { + switch { + + case owner != nil && repo != nil && jobRepoID != nil: + // Try reverse lookup + var gotOwner, gotRepo string + err := s.pool.QueryRow(ctx, + `SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1`, + *jobRepoID).Scan(&gotOwner, &gotRepo) + + if err == nil { + // Entry exists — check consistency + if gotOwner != *owner || gotRepo != *repo { + slog.Error("BijectJobRepo: ID-to-(owner,repo) mismatch", + "job_repo_id", *jobRepoID, + "expected_owner", gotOwner, + "expected_repo", gotRepo, + "given_owner", *owner, + "given_repo", *repo) + return fmt.Errorf("BijectJobRepo mismatch: (%s, %s) ≠ (%s, %s)", *owner, *repo, gotOwner, gotRepo) + } + return nil + } + + // If not found, insert (owner, repo) and get correct job_repo_id + if errors.Is(err, pgx.ErrNoRows) { + slog.Info("BijectJobRepo: entry missing, inserting new mapping", + "owner", *owner, "repo", *repo) + + return s.pool.QueryRow(ctx, ` + INSERT INTO job_repo_map (owner, repo) + VALUES ($1, $2) + ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner + RETURNING job_repo_id + `, *owner, *repo).Scan(jobRepoID) + } + + // Some other DB error + slog.Error("BijectJobRepo: reverse lookup failed", + "job_repo_id", *jobRepoID, + "error", err) + return fmt.Errorf("reverse lookup failed: %w", err) + + case owner != nil && repo != nil: + // Forward insert or lookup + err := s.pool.QueryRow(ctx, ` + INSERT INTO job_repo_map (owner, repo) + VALUES ($1, $2) + ON CONFLICT (owner, repo) DO UPDATE SET owner = EXCLUDED.owner + RETURNING job_repo_id + `, *owner, *repo).Scan(jobRepoID) + if err != nil { + slog.Error("BijectJobRepo: insert/lookup failed", + "owner", *owner, + "repo", *repo, + "error", err) + return err + } + return nil + + case jobRepoID != nil: + // Reverse lookup + err := s.pool.QueryRow(ctx, ` + SELECT owner, repo FROM job_repo_map WHERE job_repo_id = $1 + `, *jobRepoID).Scan(owner, repo) + if err != nil { + slog.Error("BijectJobRepo: reverse lookup failed", + "job_repo_id", *jobRepoID, + "error", err) + return err + } + return nil + + default: + err := fmt.Errorf("BijectJobRepo: at least one of (owner, repo) or jobRepoID must be non-nil") + slog.Error("BijectJobRepo: bad call", "error", err) + return err + } +} + func (s *PGState) AddJob(job queue.AnalyzeJob) { ctx := context.Background() js := job.Spec @@ -350,26 +411,12 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { panic("AddJob(): " + err.Error()) } - // 2. Get next job_repo_id atomically using sequence -------- - var nextID int - err = tx.QueryRow(ctx, ` - SELECT get_next_job_repo_id($1) - `, js.SessionID).Scan(&nextID) + // 2. Get job_repo_id + var jobRepoID int + err = s.BijectJobRepo(ctx, &js.Owner, &js.Repo, &jobRepoID) if err != nil { - slog.Error("AddJob: get_next_job_repo_id failed", "session", js.SessionID, "error", err) /* XX: hit here */ - panic("AddJob(): " + err.Error()) - } - - // 3. Save mapping with the atomically generated ID --------- - _, err = tx.Exec(ctx, ` - INSERT INTO job_repo_map (session_id, job_repo_id, owner, repo) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - `, js.SessionID, nextID, js.Owner, js.Repo) - if err != nil { - slog.Error("AddJob: insert job_repo_map failed", - "session", js.SessionID, "jobRepoId", nextID, "error", err) - panic("AddJob(): " + err.Error()) + slog.Error("BijectJobRepo failed", "jobspec", js, "error", err) + panic("BijectJobRepo: " + err.Error()) } // Commit the transaction @@ -378,20 +425,19 @@ func (s *PGState) AddJob(job queue.AnalyzeJob) { panic("AddJob(): " + err.Error()) } - slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", nextID, "owner", js.Owner, "repo", js.Repo) + slog.Debug("AddJob stored", "session", js.SessionID, "jobRepoId", jobRepoID, "owner", js.Owner, "repo", js.Repo) } func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) { ctx := context.Background() rows, err := s.pool.Query(ctx, ` - SELECT aj.payload FROM analyze_jobs aj - JOIN job_repo_map jrm ON aj.session_id = jrm.session_id - AND aj.owner = jrm.owner AND aj.repo = jrm.repo - WHERE aj.session_id = $1 - ORDER BY jrm.job_repo_id + SELECT payload FROM analyze_jobs + WHERE session_id = $1 + ORDER BY owner, repo `, sessionId) if err != nil { + slog.Error("GetJobList: query failed", "session_id", sessionId, "error", err) return nil, err } defer rows.Close() @@ -400,16 +446,23 @@ func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) { for rows.Next() { var jsonBytes []byte if err := rows.Scan(&jsonBytes); err != nil { + slog.Error("GetJobList: scan failed", "error", err) return nil, err } var job queue.AnalyzeJob if err := json.Unmarshal(jsonBytes, &job); err != nil { + slog.Error("GetJobList: unmarshal failed", "error", err) return nil, err } jobs = append(jobs, job) } - return jobs, rows.Err() + if err := rows.Err(); err != nil { + slog.Error("GetJobList: rows iteration failed", "error", err) + return nil, err + } + + return jobs, nil } func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, error) { @@ -419,8 +472,8 @@ func (s *PGState) GetJobSpecByRepoId(sessionId, jobRepoId int) (common.JobSpec, err := s.pool.QueryRow(ctx, ` SELECT owner, repo FROM job_repo_map - WHERE session_id = $1 AND job_repo_id = $2 - `, sessionId, jobRepoId).Scan(&owner, &repo) + WHERE job_repo_id = $1 + `, jobRepoId).Scan(&owner, &repo) if err != nil { return common.JobSpec{}, err }