Merge the agent-impl branch into the server branch

This commit is contained in:
Michael Hohn
2024-06-16 19:21:42 -07:00
committed by =Michael Hohn
26 changed files with 1406 additions and 378 deletions

View File

@@ -1,23 +1,21 @@
package agent
import (
"context"
"fmt"
"log/slog"
"mrvacommander/pkg/codeql"
"mrvacommander/pkg/common"
"mrvacommander/pkg/logger"
"mrvacommander/pkg/qpstore"
"mrvacommander/pkg/queue"
"mrvacommander/pkg/storage"
"log/slog"
"archive/tar"
"archive/zip"
"compress/gzip"
"fmt"
"io"
"path/filepath"
"mrvacommander/utils"
"os"
"os/exec"
"path/filepath"
"sync"
"github.com/google/uuid"
)
type RunnerSingle struct {
@@ -43,202 +41,123 @@ type Visibles struct {
}
func (r *RunnerSingle) worker(wid int) {
var job common.AnalyzeJob
// TODO: reimplement this later
/*
var job common.AnalyzeJob
for {
job = <-r.queue.Jobs()
for {
job = <-r.queue.Jobs()
slog.Debug("Picking up job", "job", job, "worker", wid)
slog.Debug("Picking up job", "job", job, "worker", wid)
slog.Debug("Analysis: running", "job", job)
storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusQueued)
slog.Debug("Analysis: running", "job", job)
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusQueued)
resultFile, err := r.RunAnalysis(job)
if err != nil {
continue
}
slog.Debug("Analysis run finished", "job", job)
res := common.AnalyzeResult{
RunAnalysisSARIF: resultFile,
RunAnalysisBQRS: "", // FIXME ?
}
r.queue.Results() <- res
storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusSuccess)
storage.SetResult(job.QueryPackId, job.ORepo, res)
}
}
func (r *RunnerSingle) RunAnalysis(job common.AnalyzeJob) (string, error) {
// TODO Add multi-language tests including queryLanguage
// queryPackID, queryLanguage, dbOwner, dbRepo :=
// job.QueryPackId, job.QueryLanguage, job.ORL.Owner, job.ORL.Repo
queryPackID, dbOwner, dbRepo :=
job.QueryPackId, job.ORepo.Owner, job.ORepo.Repo
serverRoot := os.Getenv("MRVA_SERVER_ROOT")
// Set up derived paths
dbPath := filepath.Join(serverRoot, "var/codeql/dbs", dbOwner, dbRepo)
dbZip := filepath.Join(serverRoot, "codeql/dbs", dbOwner, dbRepo,
fmt.Sprintf("%s_%s_db.zip", dbOwner, dbRepo))
dbExtract := filepath.Join(serverRoot, "var/codeql/dbs", dbOwner, dbRepo)
queryPack := filepath.Join(serverRoot,
"var/codeql/querypacks", fmt.Sprintf("qp-%d.tgz", queryPackID))
queryExtract := filepath.Join(serverRoot,
"var/codeql/querypacks", fmt.Sprintf("qp-%d", queryPackID))
queryOutDir := filepath.Join(serverRoot,
"var/codeql/sarif/localrun", dbOwner, dbRepo)
queryOutFile := filepath.Join(queryOutDir,
fmt.Sprintf("%s_%s.sarif", dbOwner, dbRepo))
// Prepare directory, extract database
if err := os.MkdirAll(dbExtract, 0755); err != nil {
slog.Error("Failed to create DB directory %s: %v", dbExtract, err)
return "", err
}
if err := unzipFile(dbZip, dbExtract); err != nil {
slog.Error("Failed to unzip DB", dbZip, err)
return "", err
}
// Prepare directory, extract query pack
if err := os.MkdirAll(queryExtract, 0755); err != nil {
slog.Error("Failed to create query pack directory %s: %v", queryExtract, err)
return "", err
}
if err := untarGz(queryPack, queryExtract); err != nil {
slog.Error("Failed to extract querypack %s: %v", queryPack, err)
return "", err
}
// Prepare query result directory
if err := os.MkdirAll(queryOutDir, 0755); err != nil {
slog.Error("Failed to create query result directory %s: %v", queryOutDir, err)
return "", err
}
// Run database analyze
cmd := exec.Command("codeql", "database", "analyze",
"--format=sarif-latest", "--rerun", "--output", queryOutFile,
"-j8", dbPath, queryExtract)
cmd.Dir = serverRoot
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
slog.Error("codeql database analyze failed:", "error", err, "job", job)
storage.SetStatus(job.QueryPackId, job.ORepo, common.StatusError)
return "", err
}
// Return result path
return queryOutFile, nil
}
// unzipFile extracts a zip file to the specified destination
func unzipFile(zipFile, dest string) error {
r, err := zip.OpenReader(zipFile)
if err != nil {
return err
}
defer r.Close()
for _, f := range r.File {
fPath := filepath.Join(dest, f.Name)
if f.FileInfo().IsDir() {
if err := os.MkdirAll(fPath, os.ModePerm); err != nil {
return err
}
continue
}
if err := os.MkdirAll(filepath.Dir(fPath), os.ModePerm); err != nil {
return err
}
outFile, err := os.OpenFile(fPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
rc, err := f.Open()
if err != nil {
outFile.Close()
return err
}
_, err = io.Copy(outFile, rc)
outFile.Close()
rc.Close()
if err != nil {
return err
}
}
return nil
}
// untarGz extracts a tar.gz file to the specified destination.
func untarGz(tarGzFile, dest string) error {
file, err := os.Open(tarGzFile)
if err != nil {
return err
}
defer file.Close()
gzr, err := gzip.NewReader(file)
if err != nil {
return err
}
defer gzr.Close()
return untar(gzr, dest)
}
// untar extracts a tar archive to the specified destination.
func untar(r io.Reader, dest string) error {
tr := tar.NewReader(r)
for {
header, err := tr.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
fPath := filepath.Join(dest, header.Name)
if header.Typeflag == tar.TypeDir {
if err := os.MkdirAll(fPath, os.ModePerm); err != nil {
return err
}
} else {
if err := os.MkdirAll(filepath.Dir(fPath), os.ModePerm); err != nil {
return err
}
outFile, err := os.OpenFile(fPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.FileMode(header.Mode))
resultFile, err := RunAnalysis(job)
if err != nil {
return err
continue
}
if _, err := io.Copy(outFile, tr); err != nil {
outFile.Close()
return err
}
slog.Debug("Analysis run finished", "job", job)
// TODO: FIX THIS
res := common.AnalyzeResult{
RunAnalysisSARIF: resultFile,
RunAnalysisBQRS: "", // FIXME ?
}
r.queue.Results() <- res
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusSuccess)
storage.SetResult(job.QueryPackId, job.NWO, res)
outFile.Close()
}
*/
}
// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult
func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
var result = common.AnalyzeResult{
RequestId: job.RequestId,
ResultCount: 0,
ResultArchiveURL: "",
Status: common.StatusError,
}
return nil
// Create a temporary directory
tempDir := filepath.Join(os.TempDir(), uuid.New().String())
if err := os.MkdirAll(tempDir, 0755); err != nil {
return result, fmt.Errorf("failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Extract the query pack
// TODO: download from the 'job' query pack URL
// utils.downloadFile
queryPackPath := filepath.Join(tempDir, "qp-54674")
utils.UntarGz("qp-54674.tgz", queryPackPath)
// Perform the CodeQL analysis
runResult, err := codeql.RunQuery("google_flatbuffers_db.zip", "cpp", queryPackPath, tempDir)
if err != nil {
return result, fmt.Errorf("failed to run analysis: %w", err)
}
// Generate a ZIP archive containing SARIF and BQRS files
resultsArchive, err := codeql.GenerateResultsZipArchive(runResult)
if err != nil {
return result, fmt.Errorf("failed to generate results archive: %w", err)
}
// TODO: Upload the archive to storage
slog.Debug("Results archive size", slog.Int("size", len(resultsArchive)))
result = common.AnalyzeResult{
RequestId: job.RequestId,
ResultCount: runResult.ResultCount,
ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE", // TODO
Status: common.StatusSuccess,
}
return result, nil
}
// RunWorker runs a worker that processes jobs from queue
func RunWorker(ctx context.Context, stopChan chan struct{}, queue queue.Queue, wg *sync.WaitGroup) {
const (
WORKER_COUNT_STOP_MESSAGE = "Worker stopping due to reduction in worker count"
WORKER_CONTEXT_STOP_MESSAGE = "Worker stopping due to context cancellation"
)
defer wg.Done()
slog.Info("Worker started")
for {
select {
case <-stopChan:
slog.Info(WORKER_COUNT_STOP_MESSAGE)
return
case <-ctx.Done():
slog.Info(WORKER_CONTEXT_STOP_MESSAGE)
return
default:
select {
case job, ok := <-queue.Jobs():
if !ok {
return
}
slog.Info("Running analysis job", slog.Any("job", job))
result, err := RunAnalysisJob(job)
if err != nil {
slog.Error("Failed to run analysis job", slog.Any("error", err))
continue
}
slog.Info("Analysis job completed", slog.Any("result", result))
queue.Results() <- result
case <-stopChan:
slog.Info(WORKER_COUNT_STOP_MESSAGE)
return
case <-ctx.Done():
slog.Info(WORKER_CONTEXT_STOP_MESSAGE)
return
}
}
}
}