wip: add analysis runner / agent, separate Server/Queue/Agent, use New* initializers
This commit is contained in:
committed by
=Michael Hohn
parent
2ab596bf1d
commit
f7155eba50
@@ -29,7 +29,7 @@ func main() {
|
|||||||
log.Printf("Usage of %s:\n", os.Args[0])
|
log.Printf("Usage of %s:\n", os.Args[0])
|
||||||
flag.PrintDefaults()
|
flag.PrintDefaults()
|
||||||
log.Println("\nExamples:")
|
log.Println("\nExamples:")
|
||||||
log.Println(" go run main.go --loglevel=Debug --mode=container")
|
log.Println(" go run main.go --loglevel=debug --mode=container")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse the flags
|
// Parse the flags
|
||||||
@@ -68,16 +68,21 @@ func main() {
|
|||||||
switch *mode {
|
switch *mode {
|
||||||
case "standalone":
|
case "standalone":
|
||||||
// Assemble single-process version
|
// Assemble single-process version
|
||||||
|
sq := queue.NewQueueSingle(2) // FIXME take value from configuration
|
||||||
|
sc := server.NewCommanderSingle(nil, sq)
|
||||||
|
sl := logger.NewLoggerSingle()
|
||||||
|
ss := storage.NewStorageSingle(config.Storage.StartingID)
|
||||||
|
sr := agent.NewRunnerSingle(2, sq) // FIXME take value from configuration
|
||||||
|
|
||||||
state := server.State{
|
state := server.State{
|
||||||
Commander: &server.CommanderSingle{},
|
Commander: sc,
|
||||||
Logger: &logger.LoggerSingle{},
|
Logger: sl,
|
||||||
Queue: &queue.QueueSingle{},
|
Queue: sq,
|
||||||
Storage: &storage.StorageSingle{CurrentID: config.Storage.StartingID},
|
Storage: ss,
|
||||||
Runner: &agent.RunnerSingle{},
|
Runner: sr,
|
||||||
}
|
}
|
||||||
main := &server.CommanderSingle{}
|
|
||||||
main.Setup(&state)
|
sc.Setup(&state) // sc is part of state and dereferences it
|
||||||
main.Run()
|
|
||||||
|
|
||||||
case "container":
|
case "container":
|
||||||
// Assemble cccontainer
|
// Assemble cccontainer
|
||||||
|
|||||||
57
cmd/server/run-analysis.sh
Executable file
57
cmd/server/run-analysis.sh
Executable file
@@ -0,0 +1,57 @@
|
|||||||
|
#!/bin/bash -x -e
|
||||||
|
#* Minimal setup to run analysis using information provided by a request
|
||||||
|
|
||||||
|
#* Take output saved by the server
|
||||||
|
QUERYPACKID=$1
|
||||||
|
shift
|
||||||
|
QUERYLANGUAGE=$1
|
||||||
|
shift
|
||||||
|
|
||||||
|
# and
|
||||||
|
DBOWNER=$1
|
||||||
|
shift
|
||||||
|
DBREPO=$1
|
||||||
|
|
||||||
|
GMSROOT=/Users/hohn/local/ghes-mirva-server
|
||||||
|
|
||||||
|
#* Set up derived paths
|
||||||
|
DBPATH=$GMSROOT/var/codeql/dbs/$DBOWNER/$DBREPO
|
||||||
|
DBZIP=$GMSROOT/codeql/dbs/$DBOWNER/$DBREPO/${DBOWNER}_${DBREPO}_db.zip
|
||||||
|
DBEXTRACT=$GMSROOT/var/codeql/dbs/$DBOWNER/$DBREPO
|
||||||
|
|
||||||
|
QUERYPACK=$GMSROOT/var/codeql/querypacks/qp-$QUERYPACKID.tgz
|
||||||
|
QUERYEXTRACT=$GMSROOT/var/codeql/querypacks/qp-$QUERYPACKID
|
||||||
|
|
||||||
|
QUERYOUTD=$GMSROOT/var/codeql/sarif/localrun/$DBOWNER/$DBREPO
|
||||||
|
QUERYOUTF=$QUERYOUTD/${DBOWNER}_${DBREPO}.sarif
|
||||||
|
|
||||||
|
#* Prep work before running the command
|
||||||
|
|
||||||
|
#** Extract database
|
||||||
|
mkdir -p $DBEXTRACT && cd $DBEXTRACT
|
||||||
|
unzip -o -q $DBZIP
|
||||||
|
DBINFIX=`\ls | head -1` # Could be cpp, codeql_db, or whatever
|
||||||
|
|
||||||
|
# Extract query pack
|
||||||
|
mkdir -p $QUERYEXTRACT && cd $QUERYEXTRACT
|
||||||
|
tar zxf $QUERYPACK
|
||||||
|
|
||||||
|
#** Prepare target directory
|
||||||
|
mkdir -p $QUERYOUTD
|
||||||
|
|
||||||
|
#* run database analyze
|
||||||
|
cd $GMSROOT
|
||||||
|
codeql database analyze --format=sarif-latest --rerun \
|
||||||
|
--output $QUERYOUTF \
|
||||||
|
-j8 \
|
||||||
|
-- $DBPATH/$DBINFIX $QUERYEXTRACT
|
||||||
|
|
||||||
|
#* report result
|
||||||
|
# var/codeql/sarif/localrun/google/flatbuffers/google_flatbuffers.sarif
|
||||||
|
printf "run-analysis-output in %s\n" $QUERYOUTF
|
||||||
|
# TODO Is the bqrs really necessary? It can also be found by the go code at this point.
|
||||||
|
# The name of the query is only found in the query pack itself and would have to be
|
||||||
|
# extracted from there.
|
||||||
|
# var/codeql/dbs/google/flatbuffers/cpp/results/codeql-remote/query/FlatBuffersFunc.bqrs
|
||||||
|
# BQRSPATH=$GMSROOT/var/codeql/dbs/$DBOWNER/$DBREPO/$LANGUAGE/results/codeql-remote/query/
|
||||||
|
# printf "run-analysis-bqrs in %s\n" $QUERYOUTF
|
||||||
@@ -1,4 +1,86 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"log/slog"
|
||||||
|
|
||||||
|
"mrvacommander/pkg/common"
|
||||||
|
"mrvacommander/pkg/queue"
|
||||||
|
"mrvacommander/pkg/storage"
|
||||||
|
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
type RunnerSingle struct {
|
type RunnerSingle struct {
|
||||||
|
queue queue.Queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRunnerSingle(numWorkers int, queue queue.Queue) *RunnerSingle {
|
||||||
|
r := RunnerSingle{queue: queue}
|
||||||
|
|
||||||
|
for id := 1; id <= numWorkers; id++ {
|
||||||
|
go r.worker(id)
|
||||||
|
}
|
||||||
|
return &r
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RunnerSingle) worker(wid int) {
|
||||||
|
var job common.AnalyzeJob
|
||||||
|
|
||||||
|
for {
|
||||||
|
job = <-r.queue.Jobs()
|
||||||
|
|
||||||
|
slog.Debug("Picking up job", "job", job, "worker", wid)
|
||||||
|
|
||||||
|
cwd, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("RunJob: cwd problem: ", "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Debug("Analysis: running", "job", job)
|
||||||
|
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusQueued)
|
||||||
|
cmd := exec.Command(path.Join(cwd, "cmd", "run-analysis.sh"),
|
||||||
|
strconv.FormatInt(int64(job.QueryPackId), 10),
|
||||||
|
job.QueryLanguage, job.ORL.Owner, job.ORL.Repo)
|
||||||
|
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Analysis command failed: exit code: ", "error", err, "job", job)
|
||||||
|
slog.Error("Analysis command failed: ", "job", job, "output", out)
|
||||||
|
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusError)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
slog.Debug("Analysis run finished", "job", job)
|
||||||
|
|
||||||
|
// Get the SARIF ouput location
|
||||||
|
sr := bufio.NewScanner(bytes.NewReader(out))
|
||||||
|
sr.Split(bufio.ScanLines)
|
||||||
|
for {
|
||||||
|
more := sr.Scan()
|
||||||
|
if !more {
|
||||||
|
slog.Error("Analysis run failed to report result: ", "output", out)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
fields := strings.Fields(sr.Text())
|
||||||
|
if len(fields) >= 3 {
|
||||||
|
if fields[0] == "run-analysis-output" {
|
||||||
|
slog.Debug("Analysis run successful: ", "job", job, "location", fields[2])
|
||||||
|
res := common.AnalyzeResult{
|
||||||
|
RunAnalysisSARIF: fields[2], // Abs. path from run-analysis.sh
|
||||||
|
RunAnalysisBQRS: "", // FIXME? see note in run-analysis.sh
|
||||||
|
}
|
||||||
|
r.queue.Results() <- res
|
||||||
|
storage.SetStatus(job.QueryPackId, job.ORL, common.StatusSuccess)
|
||||||
|
storage.SetResult(job.QueryPackId, job.ORL, res)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,3 +2,8 @@ package logger
|
|||||||
|
|
||||||
type LoggerSingle struct {
|
type LoggerSingle struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewLoggerSingle() *LoggerSingle {
|
||||||
|
l := LoggerSingle{}
|
||||||
|
return &l
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,4 +1,14 @@
|
|||||||
package queue
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"mrvacommander/pkg/common"
|
||||||
|
"mrvacommander/pkg/storage"
|
||||||
|
)
|
||||||
|
|
||||||
type Queue interface {
|
type Queue interface {
|
||||||
|
Jobs() chan common.AnalyzeJob
|
||||||
|
Results() chan common.AnalyzeResult
|
||||||
|
StartAnalyses(analysis_repos *map[common.OwnerRepo]storage.DBLocation,
|
||||||
|
session_id int,
|
||||||
|
session_language string)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,13 +6,15 @@ import (
|
|||||||
"mrvacommander/pkg/storage"
|
"mrvacommander/pkg/storage"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func (q *QueueSingle) Jobs() chan common.AnalyzeJob {
|
||||||
NumWorkers int
|
return q.jobs
|
||||||
Jobs chan common.AnalyzeJob
|
}
|
||||||
Results chan common.AnalyzeResult
|
|
||||||
)
|
|
||||||
|
|
||||||
func StartAnalyses(analysis_repos *map[common.OwnerRepo]storage.DBLocation, session_id int,
|
func (q *QueueSingle) Results() chan common.AnalyzeResult {
|
||||||
|
return q.results
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *QueueSingle) StartAnalyses(analysis_repos *map[common.OwnerRepo]storage.DBLocation, session_id int,
|
||||||
session_language string) {
|
session_language string) {
|
||||||
slog.Debug("Queueing codeql database analyze jobs")
|
slog.Debug("Queueing codeql database analyze jobs")
|
||||||
|
|
||||||
@@ -23,7 +25,7 @@ func StartAnalyses(analysis_repos *map[common.OwnerRepo]storage.DBLocation, sess
|
|||||||
|
|
||||||
ORL: orl,
|
ORL: orl,
|
||||||
}
|
}
|
||||||
Jobs <- info
|
q.jobs <- info
|
||||||
storage.SetStatus(session_id, orl, common.StatusQueued)
|
storage.SetStatus(session_id, orl, common.StatusQueued)
|
||||||
storage.AddJob(session_id, info)
|
storage.AddJob(session_id, info)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,17 @@
|
|||||||
package queue
|
package queue
|
||||||
|
|
||||||
|
import "mrvacommander/pkg/common"
|
||||||
|
|
||||||
type QueueSingle struct {
|
type QueueSingle struct {
|
||||||
|
NumWorkers int
|
||||||
|
jobs chan common.AnalyzeJob
|
||||||
|
results chan common.AnalyzeResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQueueSingle(numWorkers int) *QueueSingle {
|
||||||
|
q := QueueSingle{}
|
||||||
|
q.jobs = make(chan common.AnalyzeJob, 10)
|
||||||
|
q.results = make(chan common.AnalyzeResult, 10)
|
||||||
|
q.NumWorkers = numWorkers
|
||||||
|
return &q
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,15 +16,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"mrvacommander/pkg/common"
|
"mrvacommander/pkg/common"
|
||||||
"mrvacommander/pkg/queue"
|
|
||||||
"mrvacommander/pkg/storage"
|
"mrvacommander/pkg/storage"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *CommanderSingle) Run() {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *CommanderSingle) Setup(st *State) {
|
func (c *CommanderSingle) Setup(st *State) {
|
||||||
r := mux.NewRouter()
|
r := mux.NewRouter()
|
||||||
c.st = st
|
c.st = st
|
||||||
@@ -289,7 +285,7 @@ func (c *CommanderSingle) MirvaRequest(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
not_found_repos, analysisRepos := c.st.Storage.FindAvailableDBs(session_repositories)
|
not_found_repos, analysisRepos := c.st.Storage.FindAvailableDBs(session_repositories)
|
||||||
|
|
||||||
queue.StartAnalyses(analysisRepos, session_id, session_language)
|
c.queue.StartAnalyses(analysisRepos, session_id, session_language)
|
||||||
|
|
||||||
si := SessionInfo{
|
si := SessionInfo{
|
||||||
ID: session_id,
|
ID: session_id,
|
||||||
|
|||||||
@@ -26,7 +26,13 @@ type SessionInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type CommanderSingle struct {
|
type CommanderSingle struct {
|
||||||
st *State
|
st *State
|
||||||
|
queue queue.Queue
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCommanderSingle(s *State, q queue.Queue) *CommanderSingle {
|
||||||
|
c := CommanderSingle{s, q}
|
||||||
|
return &c
|
||||||
}
|
}
|
||||||
|
|
||||||
type State struct {
|
type State struct {
|
||||||
|
|||||||
@@ -27,6 +27,11 @@ type StorageSingle struct {
|
|||||||
CurrentID int
|
CurrentID int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewStorageSingle(startingID int) *StorageSingle {
|
||||||
|
s := StorageSingle{CurrentID: startingID}
|
||||||
|
return &s
|
||||||
|
}
|
||||||
|
|
||||||
func (s *StorageSingle) NextID() int {
|
func (s *StorageSingle) NextID() int {
|
||||||
s.CurrentID += 1
|
s.CurrentID += 1
|
||||||
return s.CurrentID
|
return s.CurrentID
|
||||||
@@ -121,6 +126,12 @@ func GetResult(js common.JobSpec) common.AnalyzeResult {
|
|||||||
return ar
|
return ar
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetResult(sessionid int, orl common.OwnerRepo, ar common.AnalyzeResult) {
|
||||||
|
mutex.Lock()
|
||||||
|
defer mutex.Unlock()
|
||||||
|
result[common.JobSpec{ID: sessionid, OwnerRepo: orl}] = ar
|
||||||
|
}
|
||||||
|
|
||||||
func PackageResults(ar common.AnalyzeResult, owre common.OwnerRepo, vaid int) (zipPath string, e error) {
|
func PackageResults(ar common.AnalyzeResult, owre common.OwnerRepo, vaid int) (zipPath string, e error) {
|
||||||
slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar)
|
slog.Debug("Readying zip file with .sarif/.bqrs", "analyze-result", ar)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user