Files
mrvacommander/pkg/agent/agent.go

275 lines
7.1 KiB
Go

package agent
import (
"context"
"fmt"
"log/slog"
"mrvacommander/pkg/artifactstore"
"mrvacommander/pkg/codeql"
"mrvacommander/pkg/common"
"mrvacommander/pkg/qldbstore"
"mrvacommander/pkg/queue"
"mrvacommander/utils"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"github.com/elastic/go-sysinfo"
"github.com/google/uuid"
)
/*
type RunnerSingle struct {
queue queue.Queue
}
func NewAgentSingle(numWorkers int, v *Visibles) *RunnerSingle {
r := RunnerSingle{queue: v.Queue}
for id := 1; id <= numWorkers; id++ {
go r.worker(id)
}
return &r
}
func (r *RunnerSingle) worker(wid int) {
var job common.AnalyzeJob
for {
job = <-r.queue.Jobs()
result, err := RunAnalysisJob(job)
if err != nil {
slog.Error("Failed to run analysis job", slog.Any("error", err))
continue
}
r.queue.Results() <- result
}
}
*/
const (
workerMemoryMB = 2048 // 2 GB
monitorIntervalSec = 10 // Monitor every 10 seconds
)
func calculateWorkers() int {
host, err := sysinfo.Host()
if err != nil {
slog.Error("failed to get host info", "error", err)
os.Exit(1)
}
memInfo, err := host.Memory()
if err != nil {
slog.Error("failed to get memory info", "error", err)
os.Exit(1)
}
// Get available memory in MB
totalMemoryMB := memInfo.Available / (1024 * 1024)
// Ensure we have at least one worker
workers := int(totalMemoryMB / workerMemoryMB)
if workers < 1 {
workers = 1
}
// Limit the number of workers to the number of CPUs
cpuCount := runtime.NumCPU()
if workers > cpuCount {
workers = max(cpuCount, 1)
}
return workers
}
func StartAndMonitorWorkers(ctx context.Context,
artifacts artifactstore.Store,
databases qldbstore.Store,
queue queue.Queue,
desiredWorkerCount int,
wg *sync.WaitGroup) {
currentWorkerCount := 0
stopChans := make([]chan struct{}, 0)
if desiredWorkerCount != 0 {
slog.Info("Starting workers", slog.Int("count", desiredWorkerCount))
for i := 0; i < desiredWorkerCount; i++ {
stopChan := make(chan struct{})
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
return
}
slog.Info("Worker count not specified, managing based on available memory and CPU")
for {
select {
case <-ctx.Done():
// signal all workers to stop
for _, stopChan := range stopChans {
close(stopChan)
}
return
default:
newWorkerCount := calculateWorkers()
if newWorkerCount != currentWorkerCount {
slog.Info(
"Modifying worker count",
slog.Int("current", currentWorkerCount),
slog.Int("new", newWorkerCount))
}
if newWorkerCount > currentWorkerCount {
for i := currentWorkerCount; i < newWorkerCount; i++ {
stopChan := make(chan struct{})
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
} else if newWorkerCount < currentWorkerCount {
for i := newWorkerCount; i < currentWorkerCount; i++ {
close(stopChans[i])
}
stopChans = stopChans[:newWorkerCount]
}
currentWorkerCount = newWorkerCount
time.Sleep(monitorIntervalSec * time.Second)
}
}
}
// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult
func RunAnalysisJob(
job queue.AnalyzeJob, artifacts artifactstore.Store, dbs qldbstore.Store) (queue.AnalyzeResult, error) {
var result = queue.AnalyzeResult{
Spec: job.Spec,
ResultCount: 0,
ResultLocation: artifactstore.ArtifactLocation{},
Status: common.StatusError,
}
// Create a temporary directory
tempDir := filepath.Join(os.TempDir(), uuid.New().String())
if err := os.MkdirAll(tempDir, 0600); err != nil {
return result, fmt.Errorf("failed to create temporary directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Download the query pack as a byte slice
queryPackData, err := artifacts.GetQueryPack(job.QueryPackLocation)
if err != nil {
return result, fmt.Errorf("failed to download query pack: %w", err)
}
// Write the query pack data to the filesystem
queryPackArchivePath := filepath.Join(tempDir, "query-pack.tar.gz")
if err := os.WriteFile(queryPackArchivePath, queryPackData, 0600); err != nil {
return result, fmt.Errorf("failed to write query pack archive to disk: %w", err)
}
// Make a directory and extract the query pack
queryPackPath := filepath.Join(tempDir, "pack")
if err := os.Mkdir(queryPackPath, 0600); err != nil {
return result, fmt.Errorf("failed to create query pack directory: %w", err)
}
if err := utils.UntarGz(queryPackArchivePath, queryPackPath); err != nil {
return result, fmt.Errorf("failed to extract query pack: %w", err)
}
databaseData, err := dbs.GetDatabase(job.Spec.NameWithOwner)
if err != nil {
return result, fmt.Errorf("failed to get database: %w", err)
}
// Write the CodeQL database data to the filesystem
databasePath := filepath.Join(tempDir, "database.zip")
if err := os.WriteFile(databasePath, databaseData, 0600); err != nil {
return result, fmt.Errorf("failed to write CodeQL database to disk: %w", err)
}
// Perform the CodeQL analysis
runResult, err := codeql.RunQuery(databasePath, job.QueryLanguage, queryPackPath, tempDir)
if err != nil {
return result, fmt.Errorf("failed to run analysis: %w", err)
}
// Generate a ZIP archive containing SARIF and BQRS files
resultsArchive, err := codeql.GenerateResultsZipArchive(runResult)
if err != nil {
return result, fmt.Errorf("failed to generate results archive: %w", err)
}
// Upload the archive to storage
slog.Debug("Results archive size", slog.Int("size", len(resultsArchive)))
resultsLocation, err := artifacts.SaveResult(job.Spec, resultsArchive)
if err != nil {
return result, fmt.Errorf("failed to save results archive: %w", err)
}
result = queue.AnalyzeResult{
Spec: job.Spec,
ResultCount: runResult.ResultCount,
ResultLocation: resultsLocation,
Status: common.StatusSuccess,
SourceLocationPrefix: runResult.SourceLocationPrefix,
DatabaseSHA: runResult.DatabaseSHA,
}
return result, nil
}
// RunWorker runs a worker that processes jobs from queue
func RunWorker(ctx context.Context,
artifacts artifactstore.Store,
databases qldbstore.Store,
queue queue.Queue,
stopChan chan struct{},
wg *sync.WaitGroup) {
const (
WORKER_COUNT_STOP_MESSAGE = "Worker stopping due to reduction in worker count"
WORKER_CONTEXT_STOP_MESSAGE = "Worker stopping due to context cancellation"
)
defer wg.Done()
slog.Info("Worker started")
for {
select {
case <-stopChan:
slog.Info(WORKER_COUNT_STOP_MESSAGE)
return
case <-ctx.Done():
slog.Info(WORKER_CONTEXT_STOP_MESSAGE)
return
default:
select {
case job, ok := <-queue.Jobs():
if !ok {
return
}
slog.Info("Running analysis job", slog.Any("job", job))
result, err := RunAnalysisJob(job, artifacts, databases)
if err != nil {
slog.Error("Failed to run analysis job", slog.Any("error", err))
continue
}
slog.Info("Analysis job completed", slog.Any("result", result))
queue.Results() <- result
case <-stopChan:
slog.Info(WORKER_COUNT_STOP_MESSAGE)
return
case <-ctx.Done():
slog.Info(WORKER_CONTEXT_STOP_MESSAGE)
return
}
}
}
}