commit 461ea1886d4f8acece212b42d58b0eda717f951b Author: Michael Hohn Date: Fri Dec 13 15:37:13 2024 -0800 initial copy diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e5a7107 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +all: mrvaagent + +mrvaagent: + # GOOS=linux GOARCH=arm64 go build + go build + +clean: + rm mrvaagent + diff --git a/agent.go b/agent.go new file mode 100644 index 0000000..9f1a907 --- /dev/null +++ b/agent.go @@ -0,0 +1,246 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "path/filepath" + "runtime" + "sync" + "time" + + "github.com/hohn/mrvacommander/pkg/artifactstore" + "github.com/hohn/mrvacommander/pkg/codeql" + "github.com/hohn/mrvacommander/pkg/common" + "github.com/hohn/mrvacommander/pkg/qldbstore" + "github.com/hohn/mrvacommander/pkg/queue" + "github.com/hohn/mrvacommander/utils" + + "github.com/elastic/go-sysinfo" + "github.com/google/uuid" +) + +const ( + workerMemoryMB = 2048 // 2 GB + monitorIntervalSec = 10 // Monitor every 10 seconds +) + +func calculateWorkers() int { + host, err := sysinfo.Host() + if err != nil { + slog.Error("failed to get host info", "error", err) + os.Exit(1) + } + + memInfo, err := host.Memory() + if err != nil { + slog.Error("failed to get memory info", "error", err) + os.Exit(1) + } + + // Get available memory in MB + totalMemoryMB := memInfo.Available / (1024 * 1024) + + // Ensure we have at least one worker + workers := int(totalMemoryMB / workerMemoryMB) + if workers < 1 { + workers = 1 + } + + // Limit the number of workers to the number of CPUs + cpuCount := runtime.NumCPU() + if workers > cpuCount { + workers = max(cpuCount, 1) + } + + return workers +} + +func StartAndMonitorWorkers(ctx context.Context, + artifacts artifactstore.Store, + databases qldbstore.Store, + queue queue.Queue, + desiredWorkerCount int, + wg *sync.WaitGroup) { + + currentWorkerCount := 0 + stopChans := make([]chan struct{}, 0) + + if desiredWorkerCount != 0 { + slog.Info("Starting workers", slog.Int("count", desiredWorkerCount)) + for i := 0; i < desiredWorkerCount; i++ { + stopChan := make(chan struct{}) + stopChans = append(stopChans, stopChan) + wg.Add(1) + go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) + } + return + } + + slog.Info("Worker count not specified, managing based on available memory and CPU") + + for { + select { + case <-ctx.Done(): + // signal all workers to stop + for _, stopChan := range stopChans { + close(stopChan) + } + return + default: + newWorkerCount := calculateWorkers() + + if newWorkerCount != currentWorkerCount { + slog.Info( + "Modifying worker count", + slog.Int("current", currentWorkerCount), + slog.Int("new", newWorkerCount)) + } + + if newWorkerCount > currentWorkerCount { + for i := currentWorkerCount; i < newWorkerCount; i++ { + stopChan := make(chan struct{}) + stopChans = append(stopChans, stopChan) + wg.Add(1) + go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) + } + } else if newWorkerCount < currentWorkerCount { + for i := newWorkerCount; i < currentWorkerCount; i++ { + close(stopChans[i]) + } + stopChans = stopChans[:newWorkerCount] + } + currentWorkerCount = newWorkerCount + + time.Sleep(monitorIntervalSec * time.Second) + } + } +} + +// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult +func RunAnalysisJob( + job queue.AnalyzeJob, artifacts artifactstore.Store, dbs qldbstore.Store) (queue.AnalyzeResult, error) { + var result = queue.AnalyzeResult{ + Spec: job.Spec, + ResultCount: 0, + ResultLocation: artifactstore.ArtifactLocation{}, + Status: common.StatusError, + } + + // Create a temporary directory + tempDir := filepath.Join(os.TempDir(), uuid.New().String()) + if err := os.MkdirAll(tempDir, 0600); err != nil { + return result, fmt.Errorf("failed to create temporary directory: %v", err) + } + defer os.RemoveAll(tempDir) + + // Download the query pack as a byte slice + queryPackData, err := artifacts.GetQueryPack(job.QueryPackLocation) + if err != nil { + return result, fmt.Errorf("failed to download query pack: %w", err) + } + + // Write the query pack data to the filesystem + queryPackArchivePath := filepath.Join(tempDir, "query-pack.tar.gz") + if err := os.WriteFile(queryPackArchivePath, queryPackData, 0600); err != nil { + return result, fmt.Errorf("failed to write query pack archive to disk: %w", err) + } + + // Make a directory and extract the query pack + queryPackPath := filepath.Join(tempDir, "pack") + if err := os.Mkdir(queryPackPath, 0600); err != nil { + return result, fmt.Errorf("failed to create query pack directory: %w", err) + } + if err := utils.UntarGz(queryPackArchivePath, queryPackPath); err != nil { + return result, fmt.Errorf("failed to extract query pack: %w", err) + } + + databaseData, err := dbs.GetDatabase(job.Spec.NameWithOwner) + if err != nil { + return result, fmt.Errorf("failed to get database: %w", err) + } + + // Write the CodeQL database data to the filesystem + databasePath := filepath.Join(tempDir, "database.zip") + if err := os.WriteFile(databasePath, databaseData, 0600); err != nil { + return result, fmt.Errorf("failed to write CodeQL database to disk: %w", err) + } + + // Perform the CodeQL analysis + runResult, err := codeql.RunQuery(databasePath, job.QueryLanguage, queryPackPath, tempDir) + if err != nil { + return result, fmt.Errorf("failed to run analysis: %w", err) + } + + // Generate a ZIP archive containing SARIF and BQRS files + resultsArchive, err := codeql.GenerateResultsZipArchive(runResult) + if err != nil { + return result, fmt.Errorf("failed to generate results archive: %w", err) + } + + // Upload the archive to storage + slog.Debug("Results archive size", slog.Int("size", len(resultsArchive))) + resultsLocation, err := artifacts.SaveResult(job.Spec, resultsArchive) + if err != nil { + return result, fmt.Errorf("failed to save results archive: %w", err) + } + + result = queue.AnalyzeResult{ + Spec: job.Spec, + ResultCount: runResult.ResultCount, + ResultLocation: resultsLocation, + Status: common.StatusSuccess, + SourceLocationPrefix: runResult.SourceLocationPrefix, + DatabaseSHA: runResult.DatabaseSHA, + } + + return result, nil +} + +// RunWorker runs a worker that processes jobs from queue +func RunWorker(ctx context.Context, + artifacts artifactstore.Store, + databases qldbstore.Store, + queue queue.Queue, + stopChan chan struct{}, + wg *sync.WaitGroup) { + const ( + WORKER_COUNT_STOP_MESSAGE = "Worker stopping due to reduction in worker count" + WORKER_CONTEXT_STOP_MESSAGE = "Worker stopping due to context cancellation" + ) + + defer wg.Done() + slog.Info("Worker started") + for { + select { + case <-stopChan: + slog.Info(WORKER_COUNT_STOP_MESSAGE) + return + case <-ctx.Done(): + slog.Info(WORKER_CONTEXT_STOP_MESSAGE) + return + default: + select { + case job, ok := <-queue.Jobs(): + if !ok { + return + } + slog.Info("Running analysis job", slog.Any("job", job)) + result, err := RunAnalysisJob(job, artifacts, databases) + if err != nil { + slog.Error("Failed to run analysis job", slog.Any("error", err)) + continue + } + slog.Info("Analysis job completed", slog.Any("result", result)) + queue.Results() <- result + case <-stopChan: + slog.Info(WORKER_COUNT_STOP_MESSAGE) + return + case <-ctx.Done(): + slog.Info(WORKER_CONTEXT_STOP_MESSAGE) + return + } + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9511c7a --- /dev/null +++ b/go.mod @@ -0,0 +1,32 @@ +module mrvaagent + +go 1.22.0 + +require ( + github.com/BurntSushi/toml v1.4.0 + github.com/elastic/go-sysinfo v1.14.0 + github.com/google/uuid v1.6.0 + github.com/hohn/mrvacommander v0.2.1 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/elastic/go-windows v1.0.1 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/klauspost/compress v1.17.6 // indirect + github.com/klauspost/cpuid/v2 v2.2.6 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.71 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect + github.com/rs/xid v1.5.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + howett.net/plist v1.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..c99d906 --- /dev/null +++ b/go.sum @@ -0,0 +1,67 @@ +github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= +github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/elastic/go-sysinfo v1.14.0 h1:dQRtiqLycoOOla7IflZg3aN213vqJmP0lpVpKQ9lUEY= +github.com/elastic/go-sysinfo v1.14.0/go.mod h1:FKUXnZWhnYI0ueO7jhsGV3uQJ5hiz8OqM5b3oGyaRr8= +github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= +github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hohn/mrvacommander v0.2.1 h1:HsP2Cq+pJ2e2FFagbfP9z60gvGwd1aG1srPMQ41A/Z4= +github.com/hohn/mrvacommander v0.2.1/go.mod h1:q3rqsmLGhtjrTinUEXS1z2mwMPaTH99WeTQ45zyhdBQ= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2eRGjI= +github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= +github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.71 h1:No9XfOKTYi6i0GnBj+WZwD8WP5GZfL7n7GOjRqCdAjA= +github.com/minio/minio-go/v7 v7.0.71/go.mod h1:4yBA8v80xGA30cfM3fz0DKYMXunWl/AV/6tWEs9ryzo= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +howett.net/plist v1.0.1 h1:37GdZ8tP09Q35o9ych3ehygcsL+HqKSwzctveSlarvM= +howett.net/plist v1.0.1/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g= diff --git a/main.go b/main.go new file mode 100644 index 0000000..b6efcd4 --- /dev/null +++ b/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "flag" + "log" + "log/slog" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/hohn/mrvacommander/pkg/agent" + "github.com/hohn/mrvacommander/pkg/deploy" +) + +func main() { + slog.Info("Starting agent") + workerCount := flag.Int("workers", 0, "number of workers") + logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") + flag.Parse() + + // Apply 'loglevel' flag + switch *logLevel { + case "debug": + slog.SetLogLoggerLevel(slog.LevelDebug) + case "info": + slog.SetLogLoggerLevel(slog.LevelInfo) + case "warn": + slog.SetLogLoggerLevel(slog.LevelWarn) + case "error": + slog.SetLogLoggerLevel(slog.LevelError) + default: + log.Printf("Invalid logging verbosity level: %s", *logLevel) + os.Exit(1) + } + + isAgent := true + + rabbitMQQueue, err := deploy.InitRabbitMQ(isAgent) + if err != nil { + slog.Error("Failed to initialize RabbitMQ", slog.Any("error", err)) + os.Exit(1) + } + defer rabbitMQQueue.Close() + + artifacts, err := deploy.InitMinIOArtifactStore() + if err != nil { + slog.Error("Failed to initialize artifact store", slog.Any("error", err)) + os.Exit(1) + } + + databases, err := deploy.InitMinIOCodeQLDatabaseStore() + if err != nil { + slog.Error("Failed to initialize database store", slog.Any("error", err)) + os.Exit(1) + } + + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + go agent.StartAndMonitorWorkers(ctx, artifacts, databases, rabbitMQQueue, *workerCount, &wg) + slog.Info("Agent started") + + // Gracefully exit on SIGINT/SIGTERM + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + slog.Info("Shutting down agent") + cancel() + wg.Wait() + slog.Info("Agent shutdown complete") +} diff --git a/mrvaagent.code-workspace b/mrvaagent.code-workspace new file mode 100644 index 0000000..d89cad9 --- /dev/null +++ b/mrvaagent.code-workspace @@ -0,0 +1,10 @@ +{ + "folders": [ + { + "path": "." + } + ], + "settings": { + "makefile.configureOnOpen": false + } +} \ No newline at end of file