From 25cab583c19e2d0fea4266e7ef453e8f6c03d9e0 Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Thu, 6 Jun 2024 13:19:00 -0700 Subject: [PATCH] wip: storage using postgres / gorm using partial json Several approaches of normalizing json were tried and ultimately found impractical at this point. Using a hybrid of tables and json is the current approach; this may be further normalized later. --- pkg/agent/agent.go | 10 +- pkg/common/types.go | 2 +- pkg/queue/queue.go | 2 +- pkg/server/server.go | 10 +- pkg/storage/container.go | 257 +++++--------------------------------- pkg/storage/types.go | 24 ++-- test/commander_test.go | 0 test/logger_test.go | 0 test/queue_test.go | 0 test/runner_test.go | 0 test/storage_json_test.go | 29 +++++ test/test_helpers.go | 0 12 files changed, 88 insertions(+), 246 deletions(-) delete mode 100644 test/commander_test.go delete mode 100644 test/logger_test.go delete mode 100644 test/queue_test.go delete mode 100644 test/runner_test.go create mode 100644 test/storage_json_test.go delete mode 100644 test/test_helpers.go diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 07f1bd4..0ccee85 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -40,7 +40,7 @@ func (r *RunnerSingle) worker(wid int) { slog.Debug("Picking up job", "job", job, "worker", wid) slog.Debug("Analysis: running", "job", job) - storage.SetStatus(job.QueryPackId, job.ORL, common.StatusQueued) + storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusQueued) resultFile, err := r.RunAnalysis(job) if err != nil { @@ -54,8 +54,8 @@ func (r *RunnerSingle) worker(wid int) { RunAnalysisBQRS: "", // FIXME ? } r.queue.Results() <- res - storage.SetStatus(job.QueryPackId, job.ORL, common.StatusSuccess) - storage.SetResult(job.QueryPackId, job.ORL, res) + storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusSuccess) + storage.SetResult(job.QueryPackId, job.ORepo, res) } } @@ -65,7 +65,7 @@ func (r *RunnerSingle) RunAnalysis(job common.AnalyzeJob) (string, error) { // queryPackID, queryLanguage, dbOwner, dbRepo := // job.QueryPackId, job.QueryLanguage, job.ORL.Owner, job.ORL.Repo queryPackID, dbOwner, dbRepo := - job.QueryPackId, job.ORL.Owner, job.ORL.Repo + job.QueryPackId, job.ORepo.Owner, job.ORepo.Repo // FIXME Provide this via environment or explicit argument gmsRoot := "/Users/hohn/work-gh/mrva/mrvacommander/cmd/server" @@ -124,7 +124,7 @@ func (r *RunnerSingle) RunAnalysis(job common.AnalyzeJob) (string, error) { if err := cmd.Run(); err != nil { slog.Error("codeql database analyze failed:", "error", err, "job", job) - storage.SetStatus(job.QueryPackId, job.ORL, common.StatusError) + storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusError) return "", err } diff --git a/pkg/common/types.go b/pkg/common/types.go index b9f34a7..de7575f 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -6,7 +6,7 @@ type AnalyzeJob struct { QueryPackId int QueryLanguage string - ORL OwnerRepo + ORepo OwnerRepo } type OwnerRepo struct { diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index d115a49..c228965 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -23,7 +23,7 @@ func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.OwnerRepo]storage QueryPackId: session_id, QueryLanguage: session_language, - ORL: orl, + ORepo: orl, } q.jobs <- info storage.SetStatus(session_id, orl, common.StatusQueued) diff --git a/pkg/server/server.go b/pkg/server/server.go index f9ecc5e..1bc72ae 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -64,13 +64,13 @@ func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpe all_scanned := []common.ScannedRepo{} jobs := storage.GetJobList(js.JobID) for _, job := range jobs { - astat := storage.GetStatus(js.JobID, job.ORL).ToExternalString() + astat := storage.GetStatus(js.JobID, job.ORepo).ToExternalString() all_scanned = append(all_scanned, common.ScannedRepo{ Repository: common.Repository{ ID: 0, - Name: job.ORL.Repo, - FullName: fmt.Sprintf("%s/%s", job.ORL.Owner, job.ORL.Repo), + Name: job.ORepo.Repo, + FullName: fmt.Sprintf("%s/%s", job.ORepo.Owner, job.ORepo.Repo), Private: false, StargazersCount: 0, UpdatedAt: ji.UpdatedAt, @@ -143,7 +143,7 @@ func (c *CommanderSingle) MirvaStatus(w http.ResponseWriter, r *http.Request) { js := common.JobSpec{ JobID: job.QueryPackId, - OwnerRepo: job.ORL, + OwnerRepo: job.ORepo, } ji := storage.GetJobInfo(js) @@ -367,7 +367,7 @@ func submit_response(sn SessionInfo) ([]byte, error) { for _, job := range joblist { storage.SetJobInfo(common.JobSpec{ JobID: sn.ID, - OwnerRepo: job.ORL, + OwnerRepo: job.ORepo, }, common.JobInfo{ QueryLanguage: sn.Language, CreatedAt: m_sr.CreatedAt, diff --git a/pkg/storage/container.go b/pkg/storage/container.go index a693c65..fe83190 100644 --- a/pkg/storage/container.go +++ b/pkg/storage/container.go @@ -33,43 +33,58 @@ func (s *StorageContainer) FindAvailableDBs(analysisReposRequested []common.Owne } func NewStorageContainer(startingID int) (*StorageContainer, error) { - // Set up the database connection string - const ( - host = "postgres" - port = 5432 - user = "exampleuser" - password = "examplepass" - dbname = "exampledb" - ) - // Open the database connection - dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", - host, port, user, password, dbname) - db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + db, err := ConnectDB(DBSpec{ + Host: "postgres", + Port: 5432, + User: "exampleuser", + Password: "examplepass", + DBname: "exampledb", + }) + if err != nil { - slog.Error("Error connecting to the database", "err", err) return nil, err } + s, err := LoadOrInit(db, startingID) + if err != nil { + return nil, err + } + return s, nil + +} + +func LoadOrInit(db *gorm.DB, startingID int) (*StorageContainer, error) { // Check and set up the database s := StorageContainer{RequestID: startingID, DB: db} if s.hasTables() { s.loadState() } else { - if err = s.setupDB(); err != nil { + if err := s.SetupDB(); err != nil { return nil, err } s.setFresh() } - return &s, nil } +func ConnectDB(s DBSpec) (*gorm.DB, error) { + // Open the database connection + dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", + s.Host, s.Port, s.User, s.Password, s.DBname) + db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) + if err != nil { + slog.Error("Error connecting to the database", "err", err) + return nil, err + } + return db, nil +} + func (s *StorageContainer) setFresh() { // TODO Set initial state } -func (s *StorageContainer) setupDB() error { +func (s *StorageContainer) SetupDB() error { // TODO Migrate the schemas msg := "Failed to initialize database " @@ -102,213 +117,3 @@ func (s *StorageContainer) hasTables() bool { // TODO sql query to check for tables return false } - -// ================ TODO migrate - -// func (s *StorageSingle) NextID() int { -// s.RequestID += 1 -// return s.RequestID -// } - -// func (s *StorageSingle) SaveQueryPack(tgz []byte, sessionId int) (string, error) { -// // Save the tar.gz body -// cwd, err := os.Getwd() -// if err != nil { -// slog.Error("No working directory") -// panic(err) -// } - -// dirpath := path.Join(cwd, "var", "codeql", "querypacks") -// if err := os.MkdirAll(dirpath, 0755); err != nil { -// slog.Error("Unable to create query pack output directory", -// "dir", dirpath) -// return "", err -// } - -// fpath := path.Join(dirpath, fmt.Sprintf("qp-%d.tgz", sessionId)) -// err = os.WriteFile(fpath, tgz, 0644) -// if err != nil { -// slog.Error("unable to save querypack body decoding error", "path", fpath) -// return "", err -// } else { -// slog.Info("Query pack saved to ", "path", fpath) -// } - -// return fpath, nil -// } - -// // Determine for which repositories codeql databases are available. -// // -// // Those will be the analysis_repos. The rest will be skipped. -// func (s *StorageSingle) FindAvailableDBs(analysisReposRequested []common.OwnerRepo) (notFoundRepos []common.OwnerRepo, -// analysisRepos *map[common.OwnerRepo]DBLocation) { -// slog.Debug("Looking for available CodeQL databases") - -// cwd, err := os.Getwd() -// if err != nil { -// slog.Error("No working directory") -// return -// } - -// analysisRepos = &map[common.OwnerRepo]DBLocation{} - -// notFoundRepos = []common.OwnerRepo{} - -// for _, rep := range analysisReposRequested { -// dbPrefix := filepath.Join(cwd, "codeql", "dbs", rep.Owner, rep.Repo) -// dbName := fmt.Sprintf("%s_%s_db.zip", rep.Owner, rep.Repo) -// dbPath := filepath.Join(dbPrefix, dbName) - -// if _, err := os.Stat(dbPath); errors.Is(err, fs.ErrNotExist) { -// slog.Info("Database does not exist for repository ", "owner/repo", rep, -// "path", dbPath) -// notFoundRepos = append(notFoundRepos, rep) -// } else { -// slog.Info("Found database for ", "owner/repo", rep, "path", dbPath) -// (*analysisRepos)[rep] = DBLocation{Prefix: dbPrefix, File: dbName} -// } -// } -// return notFoundRepos, analysisRepos -// } - -// func ArtifactURL(js common.JobSpec, vaid int) (string, error) { -// // We're looking for paths like -// // codeql/sarif/google/flatbuffers/google_flatbuffers.sarif - -// ar := GetResult(js) - -// hostname, err := os.Hostname() -// if err != nil { -// slog.Error("No host name found") -// return "", nil -// } - -// zfpath, err := PackageResults(ar, js.OwnerRepo, vaid) -// if err != nil { -// slog.Error("Error packaging results:", "error", err) -// return "", err -// } -// au := fmt.Sprintf("http://%s:8080/download-server/%s", hostname, zfpath) -// return au, nil -// } - -// func GetResult(js common.JobSpec) common.AnalyzeResult { -// mutex.Lock() -// defer mutex.Unlock() -// ar := result[js] -// return ar -// } - -// func SetResult(sessionid int, orl common.OwnerRepo, ar common.AnalyzeResult) { -// mutex.Lock() -// defer mutex.Unlock() -// result[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}] = ar -// } - -// func PackageResults(ar common.AnalyzeResult, owre common.OwnerRepo, vaid int) (zipPath string, e error) { -// slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar) - -// cwd, err := os.Getwd() -// if err != nil { -// slog.Error("No working directory") -// panic(err) -// } - -// // Ensure the output directory exists -// dirpath := path.Join(cwd, "var", "codeql", "localrun", "results") -// if err := os.MkdirAll(dirpath, 0755); err != nil { -// slog.Error("Unable to create results output directory", -// "dir", dirpath) -// return "", err -// } - -// // Create a new zip file -// zpath := path.Join(dirpath, fmt.Sprintf("results-%s-%s-%d.zip", owre.Owner, owre.Repo, vaid)) - -// zfile, err := os.Create(zpath) -// if err != nil { -// return "", err -// } -// defer zfile.Close() - -// // Create a new zip writer -// zwriter := zip.NewWriter(zfile) -// defer zwriter.Close() - -// // Add each result file to the zip archive -// names := []([]string){{ar.RunAnalysisSARIF, "results.sarif"}} -// for _, fpath := range names { -// file, err := os.Open(fpath[0]) -// if err != nil { -// return "", err -// } -// defer file.Close() - -// // Create a new file in the zip archive with custom name -// // The client is very specific: -// // if zf.Name != "results.sarif" && zf.Name != "results.bqrs" { continue } - -// zipEntry, err := zwriter.Create(fpath[1]) -// if err != nil { -// return "", err -// } - -// // Copy the contents of the file to the zip entry -// _, err = io.Copy(zipEntry, file) -// if err != nil { -// return "", err -// } -// } -// return zpath, nil -// } - -// func GetJobList(sessionid int) []common.AnalyzeJob { -// mutex.Lock() -// defer mutex.Unlock() -// return jobs[sessionid] -// } - -// func GetJobInfo(js common.JobSpec) common.JobInfo { -// mutex.Lock() -// defer mutex.Unlock() -// return info[js] -// } - -// func SetJobInfo(js common.JobSpec, ji common.JobInfo) { -// mutex.Lock() -// defer mutex.Unlock() -// info[js] = ji -// } - -// func GetStatus(sessionid int, orl common.OwnerRepo) common.Status { -// mutex.Lock() -// defer mutex.Unlock() -// return status[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}] -// } - -// func ResultAsFile(path string) (string, []byte, error) { -// fpath := path -// if !filepath.IsAbs(path) { -// fpath = "/" + path -// } - -// file, err := os.ReadFile(fpath) -// if err != nil { -// slog.Warn("Failed to read results file", fpath, err) -// return "", nil, err -// } - -// return fpath, file, nil -// } - -// func SetStatus(sessionid int, orl common.OwnerRepo, s common.Status) { -// mutex.Lock() -// defer mutex.Unlock() -// status[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}] = s -// } - -// func AddJob(sessionid int, job common.AnalyzeJob) { -// mutex.Lock() -// defer mutex.Unlock() -// jobs[sessionid] = append(jobs[sessionid], job) -// } diff --git a/pkg/storage/types.go b/pkg/storage/types.go index b1c5509..77346fc 100644 --- a/pkg/storage/types.go +++ b/pkg/storage/types.go @@ -15,36 +15,44 @@ type StorageSingle struct { currentID int } +type DBSpec struct { + Host string + Port int + User string + Password string + DBname string +} + type DBInfo struct { // Database version of // info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo) gorm.Model - Key common.JobSpec - JobInfo common.JobInfo + JobSpec common.JobSpec `gorm:"type:jsonb"` + JobInfo common.JobInfo `gorm:"type:jsonb"` } type DBJobs struct { // Database version of // jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob) gorm.Model - Key int - AnalyzeJob common.AnalyzeJob + JobKey int + AnalyzeJob common.AnalyzeJob `gorm:"type:jsonb"` } type DBResult struct { // Database version of // result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult) gorm.Model - Key common.JobSpec - AnalyzeResult common.AnalyzeResult + JobSpec common.JobSpec `gorm:"type:jsonb"` + AnalyzeResult common.AnalyzeResult `gorm:"type:jsonb"` } type DBStatus struct { // Database version of // status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status) gorm.Model - Key common.JobSpec - Status common.Status + JobSpec common.JobSpec `gorm:"type:jsonb"` + Status common.Status `gorm:"type:jsonb"` } type StorageContainer struct { diff --git a/test/commander_test.go b/test/commander_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/test/logger_test.go b/test/logger_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/test/queue_test.go b/test/queue_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/test/runner_test.go b/test/runner_test.go deleted file mode 100644 index e69de29..0000000 diff --git a/test/storage_json_test.go b/test/storage_json_test.go new file mode 100644 index 0000000..6898083 --- /dev/null +++ b/test/storage_json_test.go @@ -0,0 +1,29 @@ +package main + +import ( + "testing" + + "mrvacommander/pkg/storage" +) + +func TestSetupDB(t *testing.T) { + + db, err := storage.ConnectDB(storage.DBSpec{ + Host: "localhost", + Port: 5432, + User: "exampleuser", + Password: "examplepass", + DBname: "exampledb", + }) + + if err != nil { + t.Errorf("Cannot connect to db") + } + + // Check and set up the database + s := storage.StorageContainer{RequestID: 12345, DB: db} + if err := s.SetupDB(); err != nil { + t.Errorf("Cannot set up db") + } + +} diff --git a/test/test_helpers.go b/test/test_helpers.go deleted file mode 100644 index e69de29..0000000