diff --git a/pkg/state/state_postgres.go b/pkg/state/state_postgres.go index ebdcb75..bff1065 100644 --- a/pkg/state/state_postgres.go +++ b/pkg/state/state_postgres.go @@ -2,9 +2,13 @@ package state import ( "context" + "encoding/json" + "fmt" "log/slog" "os" + "github.com/hohn/mrvacommander/pkg/common" + "github.com/hohn/mrvacommander/pkg/queue" "github.com/jackc/pgx/v5/pgxpool" ) @@ -41,9 +45,35 @@ func NewPGState() *PGState { } slog.Info("Connected to Postgres", "max_conns", config.MaxConns) + + // schema initialization + SetupSchemas(pool) + return &PGState{pool: pool} } +func SetupSchemas(pool *pgxpool.Pool) { + ctx := context.Background() + + const createTable = ` + CREATE TABLE IF NOT EXISTS analyze_results ( + session_id INTEGER NOT NULL, + owner TEXT NOT NULL, + repo TEXT NOT NULL, + result JSONB NOT NULL, + PRIMARY KEY (session_id, owner, repo) + ); + ` + + _, err := pool.Exec(ctx, createTable) + if err != nil { + slog.Error("Failed to create analyze_results table", "error", err) + os.Exit(1) + } + + slog.Info("Schema initialized: analyze_results") +} + // ----- Sequence-based NextID (implements ServerState) func (s *PGState) NextID() int { ctx := context.Background() @@ -58,3 +88,47 @@ func (s *PGState) NextID() int { slog.Debug("NextID generated", "id", id) return id } + +func (s *PGState) SetResult(js common.JobSpec, ar queue.AnalyzeResult) { + ctx := context.Background() + + ar.Spec = js // ensure internal consistency + + jsonBytes, err := json.Marshal(ar) + if err != nil { + slog.Error("SetResult: JSON marshal failed", "job", js, "error", err) + panic("SetResult(): " + err.Error()) + } + + _, err = s.pool.Exec(ctx, ` + INSERT INTO analyze_results (session_id, owner, repo, result) + VALUES ($1, $2, $3, $4) + ON CONFLICT (session_id, owner, repo) + DO UPDATE SET result = EXCLUDED.result + `, js.SessionID, js.Owner, js.Repo, jsonBytes) + + if err != nil { + slog.Error("SetResult: insert/update failed", "job", js, "error", err) + panic("SetResult(): " + err.Error()) + } +} + +func (s *PGState) GetResult(js common.JobSpec) (queue.AnalyzeResult, error) { + ctx := context.Background() + + var jsonBytes []byte + err := s.pool.QueryRow(ctx, ` + SELECT result FROM analyze_results + WHERE session_id = $1 AND owner = $2 AND repo = $3 + `, js.SessionID, js.Owner, js.Repo).Scan(&jsonBytes) + if err != nil { + return queue.AnalyzeResult{}, err + } + + var ar queue.AnalyzeResult + if err := json.Unmarshal(jsonBytes, &ar); err != nil { + return queue.AnalyzeResult{}, fmt.Errorf("unmarshal AnalyzeResult: %w", err) + } + + return ar, nil +}