wip: storage using postgres / gorm using partial json

Several approaches of normalizing json were tried and ultimately found
impractical at this point.

Using a hybrid of tables and json is the current approach; this may be
further normalized later.
This commit is contained in:
Michael Hohn
2024-06-06 13:19:00 -07:00
committed by =Michael Hohn
parent 593644ca2e
commit 25cab583c1
12 changed files with 88 additions and 246 deletions

View File

@@ -40,7 +40,7 @@ func (r *RunnerSingle) worker(wid int) {
slog.Debug("Picking up job", "job", job, "worker", wid) slog.Debug("Picking up job", "job", job, "worker", wid)
slog.Debug("Analysis: running", "job", job) slog.Debug("Analysis: running", "job", job)
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusQueued) storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusQueued)
resultFile, err := r.RunAnalysis(job) resultFile, err := r.RunAnalysis(job)
if err != nil { if err != nil {
@@ -54,8 +54,8 @@ func (r *RunnerSingle) worker(wid int) {
RunAnalysisBQRS: "", // FIXME ? RunAnalysisBQRS: "", // FIXME ?
} }
r.queue.Results() <- res r.queue.Results() <- res
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusSuccess) storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusSuccess)
storage.SetResult(job.QueryPackId, job.ORL, res) storage.SetResult(job.QueryPackId, job.ORepo, res)
} }
} }
@@ -65,7 +65,7 @@ func (r *RunnerSingle) RunAnalysis(job common.AnalyzeJob) (string, error) {
// queryPackID, queryLanguage, dbOwner, dbRepo := // queryPackID, queryLanguage, dbOwner, dbRepo :=
// job.QueryPackId, job.QueryLanguage, job.ORL.Owner, job.ORL.Repo // job.QueryPackId, job.QueryLanguage, job.ORL.Owner, job.ORL.Repo
queryPackID, dbOwner, dbRepo := queryPackID, dbOwner, dbRepo :=
job.QueryPackId, job.ORL.Owner, job.ORL.Repo job.QueryPackId, job.ORepo.Owner, job.ORepo.Repo
// FIXME Provide this via environment or explicit argument // FIXME Provide this via environment or explicit argument
gmsRoot := "/Users/hohn/work-gh/mrva/mrvacommander/cmd/server" gmsRoot := "/Users/hohn/work-gh/mrva/mrvacommander/cmd/server"
@@ -124,7 +124,7 @@ func (r *RunnerSingle) RunAnalysis(job common.AnalyzeJob) (string, error) {
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
slog.Error("codeql database analyze failed:", "error", err, "job", job) slog.Error("codeql database analyze failed:", "error", err, "job", job)
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusError) storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusError)
return "", err return "", err
} }

View File

@@ -6,7 +6,7 @@ type AnalyzeJob struct {
QueryPackId int QueryPackId int
QueryLanguage string QueryLanguage string
ORL OwnerRepo ORepo OwnerRepo
} }
type OwnerRepo struct { type OwnerRepo struct {

View File

@@ -23,7 +23,7 @@ func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.OwnerRepo]storage
QueryPackId: session_id, QueryPackId: session_id,
QueryLanguage: session_language, QueryLanguage: session_language,
ORL: orl, ORepo: orl,
} }
q.jobs <- info q.jobs <- info
storage.SetStatus(session_id, orl, common.StatusQueued) storage.SetStatus(session_id, orl, common.StatusQueued)

View File

@@ -64,13 +64,13 @@ func (c *CommanderSingle) StatusResponse(w http.ResponseWriter, js common.JobSpe
all_scanned := []common.ScannedRepo{} all_scanned := []common.ScannedRepo{}
jobs := storage.GetJobList(js.JobID) jobs := storage.GetJobList(js.JobID)
for _, job := range jobs { for _, job := range jobs {
astat := storage.GetStatus(js.JobID, job.ORL).ToExternalString() astat := storage.GetStatus(js.JobID, job.ORepo).ToExternalString()
all_scanned = append(all_scanned, all_scanned = append(all_scanned,
common.ScannedRepo{ common.ScannedRepo{
Repository: common.Repository{ Repository: common.Repository{
ID: 0, ID: 0,
Name: job.ORL.Repo, Name: job.ORepo.Repo,
FullName: fmt.Sprintf("%s/%s", job.ORL.Owner, job.ORL.Repo), FullName: fmt.Sprintf("%s/%s", job.ORepo.Owner, job.ORepo.Repo),
Private: false, Private: false,
StargazersCount: 0, StargazersCount: 0,
UpdatedAt: ji.UpdatedAt, UpdatedAt: ji.UpdatedAt,
@@ -143,7 +143,7 @@ func (c *CommanderSingle) MirvaStatus(w http.ResponseWriter, r *http.Request) {
js := common.JobSpec{ js := common.JobSpec{
JobID: job.QueryPackId, JobID: job.QueryPackId,
OwnerRepo: job.ORL, OwnerRepo: job.ORepo,
} }
ji := storage.GetJobInfo(js) ji := storage.GetJobInfo(js)
@@ -367,7 +367,7 @@ func submit_response(sn SessionInfo) ([]byte, error) {
for _, job := range joblist { for _, job := range joblist {
storage.SetJobInfo(common.JobSpec{ storage.SetJobInfo(common.JobSpec{
JobID: sn.ID, JobID: sn.ID,
OwnerRepo: job.ORL, OwnerRepo: job.ORepo,
}, common.JobInfo{ }, common.JobInfo{
QueryLanguage: sn.Language, QueryLanguage: sn.Language,
CreatedAt: m_sr.CreatedAt, CreatedAt: m_sr.CreatedAt,

View File

@@ -33,43 +33,58 @@ func (s *StorageContainer) FindAvailableDBs(analysisReposRequested []common.Owne
} }
func NewStorageContainer(startingID int) (*StorageContainer, error) { func NewStorageContainer(startingID int) (*StorageContainer, error) {
// Set up the database connection string
const (
host = "postgres"
port = 5432
user = "exampleuser"
password = "examplepass"
dbname = "exampledb"
)
// Open the database connection db, err := ConnectDB(DBSpec{
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", Host: "postgres",
host, port, user, password, dbname) Port: 5432,
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{}) User: "exampleuser",
Password: "examplepass",
DBname: "exampledb",
})
if err != nil { if err != nil {
slog.Error("Error connecting to the database", "err", err)
return nil, err return nil, err
} }
s, err := LoadOrInit(db, startingID)
if err != nil {
return nil, err
}
return s, nil
}
func LoadOrInit(db *gorm.DB, startingID int) (*StorageContainer, error) {
// Check and set up the database // Check and set up the database
s := StorageContainer{RequestID: startingID, DB: db} s := StorageContainer{RequestID: startingID, DB: db}
if s.hasTables() { if s.hasTables() {
s.loadState() s.loadState()
} else { } else {
if err = s.setupDB(); err != nil { if err := s.SetupDB(); err != nil {
return nil, err return nil, err
} }
s.setFresh() s.setFresh()
} }
return &s, nil return &s, nil
} }
func ConnectDB(s DBSpec) (*gorm.DB, error) {
// Open the database connection
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
s.Host, s.Port, s.User, s.Password, s.DBname)
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
slog.Error("Error connecting to the database", "err", err)
return nil, err
}
return db, nil
}
func (s *StorageContainer) setFresh() { func (s *StorageContainer) setFresh() {
// TODO Set initial state // TODO Set initial state
} }
func (s *StorageContainer) setupDB() error { func (s *StorageContainer) SetupDB() error {
// TODO Migrate the schemas // TODO Migrate the schemas
msg := "Failed to initialize database " msg := "Failed to initialize database "
@@ -102,213 +117,3 @@ func (s *StorageContainer) hasTables() bool {
// TODO sql query to check for tables // TODO sql query to check for tables
return false return false
} }
// ================ TODO migrate
// func (s *StorageSingle) NextID() int {
// s.RequestID += 1
// return s.RequestID
// }
// func (s *StorageSingle) SaveQueryPack(tgz []byte, sessionId int) (string, error) {
// // Save the tar.gz body
// cwd, err := os.Getwd()
// if err != nil {
// slog.Error("No working directory")
// panic(err)
// }
// dirpath := path.Join(cwd, "var", "codeql", "querypacks")
// if err := os.MkdirAll(dirpath, 0755); err != nil {
// slog.Error("Unable to create query pack output directory",
// "dir", dirpath)
// return "", err
// }
// fpath := path.Join(dirpath, fmt.Sprintf("qp-%d.tgz", sessionId))
// err = os.WriteFile(fpath, tgz, 0644)
// if err != nil {
// slog.Error("unable to save querypack body decoding error", "path", fpath)
// return "", err
// } else {
// slog.Info("Query pack saved to ", "path", fpath)
// }
// return fpath, nil
// }
// // Determine for which repositories codeql databases are available.
// //
// // Those will be the analysis_repos. The rest will be skipped.
// func (s *StorageSingle) FindAvailableDBs(analysisReposRequested []common.OwnerRepo) (notFoundRepos []common.OwnerRepo,
// analysisRepos *map[common.OwnerRepo]DBLocation) {
// slog.Debug("Looking for available CodeQL databases")
// cwd, err := os.Getwd()
// if err != nil {
// slog.Error("No working directory")
// return
// }
// analysisRepos = &map[common.OwnerRepo]DBLocation{}
// notFoundRepos = []common.OwnerRepo{}
// for _, rep := range analysisReposRequested {
// dbPrefix := filepath.Join(cwd, "codeql", "dbs", rep.Owner, rep.Repo)
// dbName := fmt.Sprintf("%s_%s_db.zip", rep.Owner, rep.Repo)
// dbPath := filepath.Join(dbPrefix, dbName)
// if _, err := os.Stat(dbPath); errors.Is(err, fs.ErrNotExist) {
// slog.Info("Database does not exist for repository ", "owner/repo", rep,
// "path", dbPath)
// notFoundRepos = append(notFoundRepos, rep)
// } else {
// slog.Info("Found database for ", "owner/repo", rep, "path", dbPath)
// (*analysisRepos)[rep] = DBLocation{Prefix: dbPrefix, File: dbName}
// }
// }
// return notFoundRepos, analysisRepos
// }
// func ArtifactURL(js common.JobSpec, vaid int) (string, error) {
// // We're looking for paths like
// // codeql/sarif/google/flatbuffers/google_flatbuffers.sarif
// ar := GetResult(js)
// hostname, err := os.Hostname()
// if err != nil {
// slog.Error("No host name found")
// return "", nil
// }
// zfpath, err := PackageResults(ar, js.OwnerRepo, vaid)
// if err != nil {
// slog.Error("Error packaging results:", "error", err)
// return "", err
// }
// au := fmt.Sprintf("http://%s:8080/download-server/%s", hostname, zfpath)
// return au, nil
// }
// func GetResult(js common.JobSpec) common.AnalyzeResult {
// mutex.Lock()
// defer mutex.Unlock()
// ar := result[js]
// return ar
// }
// func SetResult(sessionid int, orl common.OwnerRepo, ar common.AnalyzeResult) {
// mutex.Lock()
// defer mutex.Unlock()
// result[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}] = ar
// }
// func PackageResults(ar common.AnalyzeResult, owre common.OwnerRepo, vaid int) (zipPath string, e error) {
// slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar)
// cwd, err := os.Getwd()
// if err != nil {
// slog.Error("No working directory")
// panic(err)
// }
// // Ensure the output directory exists
// dirpath := path.Join(cwd, "var", "codeql", "localrun", "results")
// if err := os.MkdirAll(dirpath, 0755); err != nil {
// slog.Error("Unable to create results output directory",
// "dir", dirpath)
// return "", err
// }
// // Create a new zip file
// zpath := path.Join(dirpath, fmt.Sprintf("results-%s-%s-%d.zip", owre.Owner, owre.Repo, vaid))
// zfile, err := os.Create(zpath)
// if err != nil {
// return "", err
// }
// defer zfile.Close()
// // Create a new zip writer
// zwriter := zip.NewWriter(zfile)
// defer zwriter.Close()
// // Add each result file to the zip archive
// names := []([]string){{ar.RunAnalysisSARIF, "results.sarif"}}
// for _, fpath := range names {
// file, err := os.Open(fpath[0])
// if err != nil {
// return "", err
// }
// defer file.Close()
// // Create a new file in the zip archive with custom name
// // The client is very specific:
// // if zf.Name != "results.sarif" && zf.Name != "results.bqrs" { continue }
// zipEntry, err := zwriter.Create(fpath[1])
// if err != nil {
// return "", err
// }
// // Copy the contents of the file to the zip entry
// _, err = io.Copy(zipEntry, file)
// if err != nil {
// return "", err
// }
// }
// return zpath, nil
// }
// func GetJobList(sessionid int) []common.AnalyzeJob {
// mutex.Lock()
// defer mutex.Unlock()
// return jobs[sessionid]
// }
// func GetJobInfo(js common.JobSpec) common.JobInfo {
// mutex.Lock()
// defer mutex.Unlock()
// return info[js]
// }
// func SetJobInfo(js common.JobSpec, ji common.JobInfo) {
// mutex.Lock()
// defer mutex.Unlock()
// info[js] = ji
// }
// func GetStatus(sessionid int, orl common.OwnerRepo) common.Status {
// mutex.Lock()
// defer mutex.Unlock()
// return status[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}]
// }
// func ResultAsFile(path string) (string, []byte, error) {
// fpath := path
// if !filepath.IsAbs(path) {
// fpath = "/" + path
// }
// file, err := os.ReadFile(fpath)
// if err != nil {
// slog.Warn("Failed to read results file", fpath, err)
// return "", nil, err
// }
// return fpath, file, nil
// }
// func SetStatus(sessionid int, orl common.OwnerRepo, s common.Status) {
// mutex.Lock()
// defer mutex.Unlock()
// status[common.JobSpec{RequestID: sessionid, OwnerRepo: orl}] = s
// }
// func AddJob(sessionid int, job common.AnalyzeJob) {
// mutex.Lock()
// defer mutex.Unlock()
// jobs[sessionid] = append(jobs[sessionid], job)
// }

View File

@@ -15,36 +15,44 @@ type StorageSingle struct {
currentID int currentID int
} }
type DBSpec struct {
Host string
Port int
User string
Password string
DBname string
}
type DBInfo struct { type DBInfo struct {
// Database version of // Database version of
// info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo) // info map[common.JobSpec]common.JobInfo = make(map[common.JobSpec]common.JobInfo)
gorm.Model gorm.Model
Key common.JobSpec JobSpec common.JobSpec `gorm:"type:jsonb"`
JobInfo common.JobInfo JobInfo common.JobInfo `gorm:"type:jsonb"`
} }
type DBJobs struct { type DBJobs struct {
// Database version of // Database version of
// jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob) // jobs map[int][]common.AnalyzeJob = make(map[int][]common.AnalyzeJob)
gorm.Model gorm.Model
Key int JobKey int
AnalyzeJob common.AnalyzeJob AnalyzeJob common.AnalyzeJob `gorm:"type:jsonb"`
} }
type DBResult struct { type DBResult struct {
// Database version of // Database version of
// result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult) // result map[common.JobSpec]common.AnalyzeResult = make(map[common.JobSpec]common.AnalyzeResult)
gorm.Model gorm.Model
Key common.JobSpec JobSpec common.JobSpec `gorm:"type:jsonb"`
AnalyzeResult common.AnalyzeResult AnalyzeResult common.AnalyzeResult `gorm:"type:jsonb"`
} }
type DBStatus struct { type DBStatus struct {
// Database version of // Database version of
// status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status) // status map[common.JobSpec]common.Status = make(map[common.JobSpec]common.Status)
gorm.Model gorm.Model
Key common.JobSpec JobSpec common.JobSpec `gorm:"type:jsonb"`
Status common.Status Status common.Status `gorm:"type:jsonb"`
} }
type StorageContainer struct { type StorageContainer struct {

View File

View File

View File

View File

29
test/storage_json_test.go Normal file
View File

@@ -0,0 +1,29 @@
package main
import (
"testing"
"mrvacommander/pkg/storage"
)
func TestSetupDB(t *testing.T) {
db, err := storage.ConnectDB(storage.DBSpec{
Host: "localhost",
Port: 5432,
User: "exampleuser",
Password: "examplepass",
DBname: "exampledb",
})
if err != nil {
t.Errorf("Cannot connect to db")
}
// Check and set up the database
s := storage.StorageContainer{RequestID: 12345, DB: db}
if err := s.SetupDB(); err != nil {
t.Errorf("Cannot set up db")
}
}

View File