From 41146f5aaf16817b7c3971872c1997c0cbdf91f2 Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Fri, 11 Jul 2025 14:47:30 -0700 Subject: [PATCH] wip: postgres state: update SetupSchemas, finish ServerState interface --- pkg/state/state_postgres.go | 189 +++++++++++++++++++++++++++++++++--- 1 file changed, 174 insertions(+), 15 deletions(-) diff --git a/pkg/state/state_postgres.go b/pkg/state/state_postgres.go index bff1065..bac75e1 100644 --- a/pkg/state/state_postgres.go +++ b/pkg/state/state_postgres.go @@ -55,23 +55,74 @@ func NewPGState() *PGState { func SetupSchemas(pool *pgxpool.Pool) { ctx := context.Background() - const createTable = ` - CREATE TABLE IF NOT EXISTS analyze_results ( - session_id INTEGER NOT NULL, - owner TEXT NOT NULL, - repo TEXT NOT NULL, - result JSONB NOT NULL, - PRIMARY KEY (session_id, owner, repo) - ); - ` - - _, err := pool.Exec(ctx, createTable) - if err != nil { - slog.Error("Failed to create analyze_results table", "error", err) - os.Exit(1) + schemas := []struct { + name string + sql string + }{ + { + name: "session_id_seq", + sql: ` + CREATE SEQUENCE IF NOT EXISTS session_id_seq; + `, + }, + { + name: "analyze_results", + sql: ` + CREATE TABLE IF NOT EXISTS analyze_results ( + session_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + result JSONB NOT NULL, + PRIMARY KEY (session_id, owner, repo) + ); + `, + }, + { + name: "analyze_jobs", + sql: ` + CREATE TABLE IF NOT EXISTS analyze_jobs ( + session_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (session_id, owner, repo) + ); + `, + }, + { + name: "job_info", + sql: ` + CREATE TABLE IF NOT EXISTS job_info ( + session_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + payload JSONB NOT NULL, + PRIMARY KEY (session_id, owner, repo) + ); + `, + }, + { + name: "job_status", + sql: ` + CREATE TABLE IF NOT EXISTS job_status ( + session_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + status INTEGER NOT NULL, + PRIMARY KEY (session_id, owner, repo) + ); + `, + }, } - slog.Info("Schema initialized: analyze_results") + for _, schema := range schemas { + _, err := pool.Exec(ctx, schema.sql) + if err != nil { + slog.Error("Failed to create table", "table", schema.name, "error", err) + os.Exit(1) + } + slog.Info("Schema initialized", "table", schema.name) + } } // ----- Sequence-based NextID (implements ServerState) @@ -132,3 +183,111 @@ func (s *PGState) GetResult(js common.JobSpec) (queue.AnalyzeResult, error) { return ar, nil } + +func (s *PGState) SetJobInfo(js common.JobSpec, ji common.JobInfo) { + ctx := context.Background() + + jiJSON, err := json.Marshal(ji) + if err != nil { + slog.Error("SetJobInfo: marshal failed", "job", js, "error", err) + panic("SetJobInfo(): " + err.Error()) + } + + _, err = s.pool.Exec(ctx, ` + INSERT INTO job_info (session_id, owner, repo, payload) + VALUES ($1, $2, $3, $4) + ON CONFLICT (session_id, owner, repo) + DO UPDATE SET payload = EXCLUDED.payload + `, js.SessionID, js.Owner, js.Repo, jiJSON) + + if err != nil { + slog.Error("SetJobInfo: insert/update failed", "job", js, "error", err) + panic("SetJobInfo(): " + err.Error()) + } +} + +func (s *PGState) GetJobInfo(js common.JobSpec) (common.JobInfo, error) { + ctx := context.Background() + + var jsonBytes []byte + err := s.pool.QueryRow(ctx, ` + SELECT payload FROM job_info + WHERE session_id = $1 AND owner = $2 AND repo = $3 + `, js.SessionID, js.Owner, js.Repo).Scan(&jsonBytes) + if err != nil { + return common.JobInfo{}, err + } + + var ji common.JobInfo + if err := json.Unmarshal(jsonBytes, &ji); err != nil { + return common.JobInfo{}, fmt.Errorf("unmarshal JobInfo: %w", err) + } + + return ji, nil +} + +func (s *PGState) SetStatus(js common.JobSpec, status common.Status) { + ctx := context.Background() + + _, err := s.pool.Exec(ctx, ` + INSERT INTO job_status (session_id, owner, repo, status) + VALUES ($1, $2, $3, $4) + ON CONFLICT (session_id, owner, repo) + DO UPDATE SET status = EXCLUDED.status + `, js.SessionID, js.Owner, js.Repo, status) + + if err != nil { + slog.Error("SetStatus failed", "job", js, "status", status, "error", err) + panic("SetStatus(): " + err.Error()) + } +} + +func (s *PGState) AddJob(job queue.AnalyzeJob) { + ctx := context.Background() + js := job.Spec + + jobJSON, err := json.Marshal(job) + if err != nil { + slog.Error("AddJob: marshal failed", "job", js, "error", err) + panic("AddJob(): " + err.Error()) + } + + _, err = s.pool.Exec(ctx, ` + INSERT INTO analyze_jobs (session_id, owner, repo, payload) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + `, js.SessionID, js.Owner, js.Repo, jobJSON) + + if err != nil { + slog.Error("AddJob: insert failed", "job", js, "error", err) + panic("AddJob(): " + err.Error()) + } +} + +func (s *PGState) GetJobList(sessionId int) ([]queue.AnalyzeJob, error) { + ctx := context.Background() + + rows, err := s.pool.Query(ctx, ` + SELECT payload FROM analyze_jobs + WHERE session_id = $1 + `, sessionId) + if err != nil { + return nil, err + } + defer rows.Close() + + var jobs []queue.AnalyzeJob + for rows.Next() { + var jsonBytes []byte + if err := rows.Scan(&jsonBytes); err != nil { + return nil, err + } + var job queue.AnalyzeJob + if err := json.Unmarshal(jsonBytes, &job); err != nil { + return nil, err + } + jobs = append(jobs, job) + } + + return jobs, rows.Err() +}