From bde8ac2db7d2018681d8da7f56960e195723e12b Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Mon, 12 May 2025 14:59:59 -0700 Subject: [PATCH] Drop dynamic worker count; set default to 1 --- cmd/agent/main.go | 2 +- pkg/agent/agent.go | 101 +++++++++------------------------------------ 2 files changed, 20 insertions(+), 83 deletions(-) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index b6efcd4..3666773 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -16,7 +16,7 @@ import ( func main() { slog.Info("Starting agent") - workerCount := flag.Int("workers", 0, "number of workers") + workerCount := flag.Int("workers", 1, "number of workers") logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") flag.Parse() diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index f579534..632678e 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -6,9 +6,7 @@ import ( "log/slog" "os" "path/filepath" - "runtime" "sync" - "time" "github.com/hohn/mrvacommander/pkg/artifactstore" "github.com/hohn/mrvacommander/pkg/codeql" @@ -17,7 +15,6 @@ import ( "github.com/hohn/mrvacommander/pkg/queue" "github.com/hohn/mrvacommander/utils" - "github.com/elastic/go-sysinfo" "github.com/google/uuid" ) @@ -51,41 +48,9 @@ func (r *RunnerSingle) worker(wid int) { */ const ( - workerMemoryMB = 2048 // 2 GB - monitorIntervalSec = 10 // Monitor every 10 seconds + workerMemoryMB = 2048 // 2 GB ) -func calculateWorkers() int { - host, err := sysinfo.Host() - if err != nil { - slog.Error("failed to get host info", "error", err) - os.Exit(1) - } - - memInfo, err := host.Memory() - if err != nil { - slog.Error("failed to get memory info", "error", err) - os.Exit(1) - } - - // Get available memory in MB - totalMemoryMB := memInfo.Available / (1024 * 1024) - - // Ensure we have at least one worker - workers := int(totalMemoryMB / workerMemoryMB) - if workers < 1 { - workers = 1 - } - - // Limit the number of workers to the number of CPUs - cpuCount := runtime.NumCPU() - if workers > cpuCount { - workers = max(cpuCount, 1) - } - - return workers -} - func StartAndMonitorWorkers(ctx context.Context, artifacts artifactstore.Store, databases qldbstore.Store, @@ -93,57 +58,29 @@ func StartAndMonitorWorkers(ctx context.Context, desiredWorkerCount int, wg *sync.WaitGroup) { - currentWorkerCount := 0 - stopChans := make([]chan struct{}, 0) - - if desiredWorkerCount != 0 { - slog.Info("Starting workers", slog.Int("count", desiredWorkerCount)) - for i := 0; i < desiredWorkerCount; i++ { - stopChan := make(chan struct{}) - stopChans = append(stopChans, stopChan) - wg.Add(1) - go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) - } - return + var workerCount int + if desiredWorkerCount > 0 { + workerCount = desiredWorkerCount + slog.Info("Starting fixed number of workers", slog.Int("count", workerCount)) + } else { + workerCount = 1 + slog.Info("Starting preset number of workers", slog.Int("count", workerCount)) } - slog.Info("Worker count not specified, managing based on available memory and CPU") + stopChans := make([]chan struct{}, workerCount) - for { - select { - case <-ctx.Done(): - // signal all workers to stop - for _, stopChan := range stopChans { - close(stopChan) - } - return - default: - newWorkerCount := calculateWorkers() + for i := 0; i < workerCount; i++ { + stopChan := make(chan struct{}) + stopChans[i] = stopChan + wg.Add(1) + go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) + } - if newWorkerCount != currentWorkerCount { - slog.Info( - "Modifying worker count", - slog.Int("current", currentWorkerCount), - slog.Int("new", newWorkerCount)) - } + // Wait for context cancellation + <-ctx.Done() - if newWorkerCount > currentWorkerCount { - for i := currentWorkerCount; i < newWorkerCount; i++ { - stopChan := make(chan struct{}) - stopChans = append(stopChans, stopChan) - wg.Add(1) - go RunWorker(ctx, artifacts, databases, queue, stopChan, wg) - } - } else if newWorkerCount < currentWorkerCount { - for i := newWorkerCount; i < currentWorkerCount; i++ { - close(stopChans[i]) - } - stopChans = stopChans[:newWorkerCount] - } - currentWorkerCount = newWorkerCount - - time.Sleep(monitorIntervalSec * time.Second) - } + for _, stopChan := range stopChans { + close(stopChan) } }