From 8df96738970ca81a10b060b6370b5b029ff52ea8 Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Thu, 13 Jun 2024 08:46:05 -0700 Subject: [PATCH] wip: Mark update slots with XX:, add pkg/server/container.go --- cmd/server/main.go | 63 +++---- pkg/agent/agent.go | 2 +- pkg/qpstore/container.go | 8 +- pkg/server/container.go | 339 ++++++++++++++++++++++++++++++++++++++ pkg/server/interfaces.go | 11 ++ pkg/server/server.go | 6 +- pkg/server/types.go | 9 + pkg/storage/container.go | 38 ++++- test/storage_json_test.go | 4 +- 9 files changed, 434 insertions(+), 46 deletions(-) create mode 100644 pkg/server/container.go diff --git a/cmd/server/main.go b/cmd/server/main.go index bc4ba5e..c203200 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -68,12 +68,19 @@ func main() { // Apply 'mode' flag switch *mode { case "standalone": + // XX: combine New/Setup functions? + // Assemble single-process version - sq := queue.NewQueueSingle(2) // FIXME take value from configuration - sc := server.NewCommanderSingle() sl := logger.NewLoggerSingle() + sl.Setup(&logger.Visibles{}) + + sq := queue.NewQueueSingle(2) // FIXME take value from configuration + sq.Setup(&queue.Visibles{ + Logger: sl, + }) + ss := storage.NewStorageSingle(config.Storage.StartingID) - sr := agent.NewRunnerSingle(2, sq) // FIXME take value from configuration + ss.Setup(&storage.Visibles{}) qp, err := qpstore.NewStore(config.Storage.StartingID) if err != nil { @@ -81,12 +88,13 @@ func main() { os.Exit(1) } - ql, err := storage.NewQLDBStore() + ql, err := storage.NewQLDBStore(config.Storage.StartingID) if err != nil { slog.Error("Unable to initialize ql database storage") os.Exit(1) } + sc := server.NewCommanderSingle() sc.Setup(&server.Visibles{ Logger: sl, Queue: sq, @@ -95,14 +103,7 @@ func main() { QLDBStore: ql, }) - sl.Setup(&logger.Visibles{}) - - sq.Setup(&queue.Visibles{ - Logger: sl, - }) - - ss.Setup(&storage.Visibles{}) - + sr := agent.NewAgentSingle(2, sq) // FIXME take value from configuration sr.Setup(&agent.Visibles{ Logger: sl, Queue: sq, @@ -111,16 +112,23 @@ func main() { }) case "container": - // Assemble container version - sq := queue.NewQueueSingle(2) // FIXME take value from configuration - sc := server.NewCommanderSingle() - sl := logger.NewLoggerSingle() + // XX: combine New/Setup functions? - ss, err := storage.NewStorageContainer(config.Storage.StartingID) + // Assemble container version + sl := logger.NewLoggerSingle() + sl.Setup(&logger.Visibles{}) + + sq := queue.NewQueueSingle(2) // FIXME take value from configuration + sq.Setup(&queue.Visibles{ + Logger: sl, + }) + + ss, err := storage.NewServerStore(config.Storage.StartingID) if err != nil { slog.Error("Unable to initialize server storage") os.Exit(1) } + ss.Setup(&storage.Visibles{}) qp, err := qpstore.NewStore(config.Storage.StartingID) if err != nil { @@ -128,33 +136,26 @@ func main() { os.Exit(1) } - ql, err := storage.NewQLDBStore() + ql, err := storage.NewQLDBStore(config.Storage.StartingID) if err != nil { slog.Error("Unable to initialize ql database storage") os.Exit(1) } + ql.Setup(&storage.Visibles{}) - sr := agent.NewRunnerSingle(2, sq) // FIXME take value from configuration - - sc.Setup(&server.Visibles{ + sr := agent.NewAgentSingle(2, sq) // FIXME take value from configuration + sr.Setup(&agent.Visibles{ Logger: sl, Queue: sq, - ServerStore: ss, QueryPackStore: qp, QLDBStore: ql, }) - sl.Setup(&logger.Visibles{}) - - sq.Setup(&queue.Visibles{ - Logger: sl, - }) - - ss.Setup(&storage.Visibles{}) - - sr.Setup(&agent.Visibles{ + sc := server.NewCommanderContainer() + sc.Setup(&server.Visibles{ Logger: sl, Queue: sq, + ServerStore: ss, QueryPackStore: qp, QLDBStore: ql, }) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3a62fbe..3898960 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -24,7 +24,7 @@ type RunnerSingle struct { queue queue.Queue } -func NewRunnerSingle(numWorkers int, queue queue.Queue) *RunnerSingle { +func NewAgentSingle(numWorkers int, queue queue.Queue) *RunnerSingle { r := RunnerSingle{queue: queue} for id := 1; id <= numWorkers; id++ { diff --git a/pkg/qpstore/container.go b/pkg/qpstore/container.go index 120b805..acecacc 100644 --- a/pkg/qpstore/container.go +++ b/pkg/qpstore/container.go @@ -32,7 +32,7 @@ type DBSpec struct { DBname string } -func (s *StorageContainer) SetupDB() error { +func (s *StorageContainer) SetupTables() error { // TODO set up query pack storage return nil } @@ -84,7 +84,7 @@ func NewStore(startingID int) (Storage, error) { } s := StorageContainer{RequestID: startingID, DB: db} - if err := s.SetupDB(); err != nil { + if err := s.SetupTables(); err != nil { return nil, err } @@ -95,7 +95,7 @@ func NewStore(startingID int) (Storage, error) { return &s, nil } -func NewStorageContainer(startingID int) (*StorageContainer, error) { +func NewServerStore(startingID int) (*StorageContainer, error) { db, err := ConnectDB(DBSpec{ Host: "postgres", @@ -109,7 +109,7 @@ func NewStorageContainer(startingID int) (*StorageContainer, error) { } s := StorageContainer{RequestID: startingID, DB: db} - if err := s.SetupDB(); err != nil { + if err := s.SetupTables(); err != nil { return nil, err } diff --git a/pkg/server/container.go b/pkg/server/container.go new file mode 100644 index 0000000..f8c22b5 --- /dev/null +++ b/pkg/server/container.go @@ -0,0 +1,339 @@ +package server + +import ( + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "log/slog" + "mrvacommander/pkg/common" + "mrvacommander/pkg/storage" + "net/http" + "strconv" + "strings" + + "github.com/gorilla/mux" +) + +func (c *CommanderContainer) Setup(st *Visibles) { + c.st = st + setupEndpoints(c) +} + +func (c *CommanderContainer) StatusResponse(w http.ResponseWriter, js common.JobSpec, ji common.JobInfo, vaid int) { + slog.Debug("Submitting status response", "session", vaid) + + all_scanned := []common.ScannedRepo{} + // XX: + jobs := storage.GetJobList(js.JobID) + for _, job := range jobs { + astat := storage.GetStatus(js.JobID, job.ORepo).ToExternalString() + all_scanned = append(all_scanned, + common.ScannedRepo{ + Repository: common.Repository{ + ID: 0, + Name: job.ORepo.Repo, + FullName: fmt.Sprintf("%s/%s", job.ORepo.Owner, job.ORepo.Repo), + Private: false, + StargazersCount: 0, + UpdatedAt: ji.UpdatedAt, + }, + AnalysisStatus: astat, + ResultCount: 123, // FIXME 123 is a lie so the client downloads + ArtifactSizeBytes: 123, // FIXME + }, + ) + } + + // XX: + astat := storage.GetStatus(js.JobID, js.OwnerRepo).ToExternalString() + + status := common.StatusResponse{ + SessionId: js.JobID, + ControllerRepo: common.ControllerRepo{}, + Actor: common.Actor{}, + QueryLanguage: ji.QueryLanguage, + QueryPackURL: "", // FIXME + CreatedAt: ji.CreatedAt, + UpdatedAt: ji.UpdatedAt, + ActionsWorkflowRunID: 0, // FIXME + Status: astat, + ScannedRepositories: all_scanned, + SkippedRepositories: ji.SkippedRepositories, + } + + // Encode the response as JSON + submitStatus, err := json.Marshal(status) + if err != nil { + slog.Error("Error encoding response as JSON:", + "error", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Send analysisReposJSON via ResponseWriter + w.Header().Set("Content-Type", "application/json") + w.Write(submitStatus) +} + +func (c *CommanderContainer) RootHandler(w http.ResponseWriter, r *http.Request) { + slog.Info("Request on /") +} + +func (c *CommanderContainer) MirvaStatus(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("mrva status request for ", + "owner", vars["owner"], + "repo", vars["repo"], + "codeql_variant_analysis_id", vars["codeql_variant_analysis_id"]) + id, err := strconv.Atoi(vars["codeql_variant_analysis_id"]) + if err != nil { + slog.Error("Variant analysis is is not integer", "id", + vars["codeql_variant_analysis_id"]) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + // The status reports one status for all jobs belonging to an id. + // So we simply report the status of a job as the status of all. + // XX: + spec := storage.GetJobList(id) + if spec == nil { + msg := "No jobs found for given job id" + slog.Error(msg, "id", vars["codeql_variant_analysis_id"]) + http.Error(w, msg, http.StatusUnprocessableEntity) + return + } + + job := spec[0] + + js := common.JobSpec{ + JobID: job.QueryPackId, + OwnerRepo: job.ORepo, + } + + // XX: + ji := storage.GetJobInfo(js) + + c.StatusResponse(w, js, ji, id) +} + +// Download artifacts +func (c *CommanderContainer) MirvaDownloadArtifact(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("MRVA artifact download", + "controller_owner", vars["controller_owner"], + "controller_repo", vars["controller_repo"], + "codeql_variant_analysis_id", vars["codeql_variant_analysis_id"], + "repo_owner", vars["repo_owner"], + "repo_name", vars["repo_name"], + ) + vaid, err := strconv.Atoi(vars["codeql_variant_analysis_id"]) + if err != nil { + slog.Error("Variant analysis is is not integer", "id", + vars["codeql_variant_analysis_id"]) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + js := common.JobSpec{ + JobID: vaid, + OwnerRepo: common.OwnerRepo{ + Owner: vars["repo_owner"], + Repo: vars["repo_name"], + }, + } + c.DownloadResponse(w, js, vaid) +} + +func (c *CommanderContainer) DownloadResponse(w http.ResponseWriter, js common.JobSpec, vaid int) { + slog.Debug("Forming download response", "session", vaid, "job", js) + + // XX: + astat := storage.GetStatus(vaid, js.OwnerRepo) + + var dlr common.DownloadResponse + if astat == common.StatusSuccess { + + // XX: + au, err := storage.ArtifactURL(js, vaid) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + dlr = common.DownloadResponse{ + Repository: common.DownloadRepo{ + Name: js.Repo, + FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo), + }, + AnalysisStatus: astat.ToExternalString(), + ResultCount: 123, // FIXME + ArtifactSizeBytes: 123, // FIXME + DatabaseCommitSha: "do-we-use-dcs-p", + SourceLocationPrefix: "do-we-use-slp-p", + ArtifactURL: au, + } + } else { + dlr = common.DownloadResponse{ + Repository: common.DownloadRepo{ + Name: js.Repo, + FullName: fmt.Sprintf("%s/%s", js.Owner, js.Repo), + }, + AnalysisStatus: astat.ToExternalString(), + ResultCount: 0, + ArtifactSizeBytes: 0, + DatabaseCommitSha: "", + SourceLocationPrefix: "/not/relevant/here", + ArtifactURL: "", + } + } + + // Encode the response as JSON + jdlr, err := json.Marshal(dlr) + if err != nil { + slog.Error("Error encoding response as JSON:", + "error", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + // Send analysisReposJSON via ResponseWriter + w.Header().Set("Content-Type", "application/json") + w.Write(jdlr) + +} + +func (c *CommanderContainer) MirvaDownloadServe(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("File download request", "local_path", vars["local_path"]) + + FileDownload(w, vars["local_path"]) +} + +func (c *CommanderContainer) MirvaRequestID(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("New mrva using repository_id=%v\n", vars["repository_id"]) +} + +func (c *CommanderContainer) MirvaRequest(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + slog.Info("New mrva run ", "owner", vars["owner"], "repo", vars["repo"]) + + session_id := c.st.ServerStore.NextID() + session_owner := vars["owner"] + session_controller_repo := vars["repo"] + slog.Info("new run", "id: ", fmt.Sprint(session_id), session_owner, session_controller_repo) + session_language, session_repositories, session_tgz_ref, err := c.collectRequestInfo(w, r, session_id) + if err != nil { + return + } + + not_found_repos, analysisRepos := c.st.ServerStore.FindAvailableDBs(session_repositories) + + c.st.Queue.StartAnalyses(analysisRepos, session_id, session_language) + + si := SessionInfo{ + ID: session_id, + Owner: session_owner, + ControllerRepo: session_controller_repo, + + QueryPack: session_tgz_ref, + Language: session_language, + Repositories: session_repositories, + + AccessMismatchRepos: nil, /* FIXME */ + NotFoundRepos: not_found_repos, + NoCodeqlDBRepos: nil, /* FIXME */ + OverLimitRepos: nil, /* FIXME */ + + AnalysisRepos: analysisRepos, + } + + slog.Debug("Forming and sending response for submitted analysis job", "id", si.ID) + submit_response, err := submit_response(si) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(submit_response) +} + +func (c *CommanderContainer) collectRequestInfo(w http.ResponseWriter, r *http.Request, sessionId int) (string, []common.OwnerRepo, string, error) { + slog.Debug("Collecting session info") + + if r.Body == nil { + err := errors.New("missing request body") + log.Println(err) + http.Error(w, err.Error(), http.StatusNoContent) + return "", []common.OwnerRepo{}, "", err + } + buf, err := io.ReadAll(r.Body) + if err != nil { + var w http.ResponseWriter + slog.Error("Error reading MRVA submission body", "error", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return "", []common.OwnerRepo{}, "", err + } + msg, err := TrySubmitMsg(buf) + if err != nil { + // Unknown message + slog.Error("Unknown MRVA submission body format") + http.Error(w, err.Error(), http.StatusBadRequest) + return "", []common.OwnerRepo{}, "", err + } + // Decompose the SubmitMsg and keep information + + // Save the query pack and keep the location + if !isBase64Gzip([]byte(msg.QueryPack)) { + slog.Error("MRVA submission body querypack has invalid format") + err := errors.New("MRVA submission body querypack has invalid format") + http.Error(w, err.Error(), http.StatusBadRequest) + return "", []common.OwnerRepo{}, "", err + } + session_tgz_ref, err := c.extract_tgz(msg.QueryPack, sessionId) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return "", []common.OwnerRepo{}, "", err + } + + // 2. Save the language + session_language := msg.Language + + // 3. Save the repositories + var session_repositories []common.OwnerRepo + + for _, v := range msg.Repositories { + t := strings.Split(v, "/") + if len(t) != 2 { + err := "Invalid owner / repository entry" + slog.Error(err, "entry", t) + http.Error(w, err, http.StatusBadRequest) + } + session_repositories = append(session_repositories, + common.OwnerRepo{Owner: t[0], Repo: t[1]}) + } + return session_language, session_repositories, session_tgz_ref, nil +} + +func (c *CommanderContainer) extract_tgz(qp string, sessionID int) (string, error) { + // These are decoded manually via + // base64 -d < foo1 | gunzip | tar t | head -20 + // base64 decode the body + slog.Debug("Extracting query pack") + + tgz, err := base64.StdEncoding.DecodeString(qp) + if err != nil { + slog.Error("querypack body decoding error:", err) + return "", err + } + + session_query_pack_tgz_filepath, err := c.st.ServerStore.SaveQueryPack(tgz, sessionID) + if err != nil { + return "", err + } + + return session_query_pack_tgz_filepath, err +} diff --git a/pkg/server/interfaces.go b/pkg/server/interfaces.go index 5c814cd..230f4b3 100644 --- a/pkg/server/interfaces.go +++ b/pkg/server/interfaces.go @@ -1,3 +1,14 @@ package server +import "net/http" + type Commander interface{} + +type CommanderAPI interface { + MirvaRequestID(w http.ResponseWriter, r *http.Request) + MirvaRequest(w http.ResponseWriter, r *http.Request) + RootHandler(w http.ResponseWriter, r *http.Request) + MirvaStatus(w http.ResponseWriter, r *http.Request) + MirvaDownloadArtifact(w http.ResponseWriter, r *http.Request) + MirvaDownloadServe(w http.ResponseWriter, r *http.Request) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 603bb15..3894b15 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -22,8 +22,12 @@ import ( ) func (c *CommanderSingle) Setup(st *Visibles) { - r := mux.NewRouter() c.st = st + setupEndpoints(c) +} + +func setupEndpoints(c CommanderAPI) { + r := mux.NewRouter() // // First are the API endpoints that mirror those used in the github API diff --git a/pkg/server/types.go b/pkg/server/types.go index 927d2c4..59d59f0 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -34,6 +34,15 @@ func NewCommanderSingle() *CommanderSingle { return &c } +type CommanderContainer struct { + st *Visibles +} + +func NewCommanderContainer() *CommanderContainer { + c := CommanderContainer{} + return &c +} + // type State struct { // Commander Commander // Logger logger.Logger diff --git a/pkg/storage/container.go b/pkg/storage/container.go index 3ba0214..0115d4b 100644 --- a/pkg/storage/container.go +++ b/pkg/storage/container.go @@ -33,16 +33,38 @@ func (s *StorageContainer) FindAvailableDBs(analysisReposRequested []common.Owne } func (s *StorageContainer) Setup(v *Visibles) { + // TODO XX: set up qldb_db s.modules = v + } -func NewQLDBStore() (*StorageContainer, error) { - // TODO set up qldb_db - return nil, nil +func NewQLDBStore(startingID int) (*StorageContainer, error) { + // TODO drop the startingID + db, err := ConnectDB(DBSpec{ + Host: "postgres", + Port: 5432, + User: "exampleuser", + Password: "examplepass", + DBname: "querypack_db", + }) + if err != nil { + return nil, err + } + + s := StorageContainer{RequestID: startingID, DB: db} + // TODO XX: set up qldb_db tables + if err := s.SetupTables(); err != nil { + return nil, err + } + + if err = s.loadState(); err != nil { + return nil, err + } + + return &s, nil } -func NewStorageContainer(startingID int) (*StorageContainer, error) { - +func NewServerStore(startingID int) (*StorageContainer, error) { db, err := ConnectDB(DBSpec{ Host: "postgres", Port: 5432, @@ -55,7 +77,7 @@ func NewStorageContainer(startingID int) (*StorageContainer, error) { } s := StorageContainer{RequestID: startingID, DB: db} - if err := s.SetupDB(); err != nil { + if err := s.SetupTables(); err != nil { return nil, err } @@ -78,7 +100,7 @@ func ConnectDB(s DBSpec) (*gorm.DB, error) { return db, nil } -func (s *StorageContainer) SetupDB() error { +func (s *StorageContainer) SetupTables() error { msg := "Failed to initialize database " if err := s.DB.AutoMigrate(&DBInfo{}); err != nil { @@ -102,11 +124,13 @@ func (s *StorageContainer) SetupDB() error { } func (s *StorageContainer) loadState() error { + // XX: // TODO load the state return nil } func (s *StorageContainer) hasTables() bool { + // XX: // TODO sql query to check for tables return false } diff --git a/test/storage_json_test.go b/test/storage_json_test.go index 6898083..19abf41 100644 --- a/test/storage_json_test.go +++ b/test/storage_json_test.go @@ -6,7 +6,7 @@ import ( "mrvacommander/pkg/storage" ) -func TestSetupDB(t *testing.T) { +func TestSetupTables(t *testing.T) { db, err := storage.ConnectDB(storage.DBSpec{ Host: "localhost", @@ -22,7 +22,7 @@ func TestSetupDB(t *testing.T) { // Check and set up the database s := storage.StorageContainer{RequestID: 12345, DB: db} - if err := s.SetupDB(); err != nil { + if err := s.SetupTables(); err != nil { t.Errorf("Cannot set up db") }