Drop dynamic worker count; set default to 1

This commit is contained in:
Michael Hohn
2025-05-12 14:59:59 -07:00
committed by =Michael Hohn
parent 75e57dc0a8
commit bde8ac2db7
2 changed files with 20 additions and 83 deletions

View File

@@ -16,7 +16,7 @@ import (
func main() { func main() {
slog.Info("Starting agent") slog.Info("Starting agent")
workerCount := flag.Int("workers", 0, "number of workers") workerCount := flag.Int("workers", 1, "number of workers")
logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error")
flag.Parse() flag.Parse()

View File

@@ -6,9 +6,7 @@ import (
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sync" "sync"
"time"
"github.com/hohn/mrvacommander/pkg/artifactstore" "github.com/hohn/mrvacommander/pkg/artifactstore"
"github.com/hohn/mrvacommander/pkg/codeql" "github.com/hohn/mrvacommander/pkg/codeql"
@@ -17,7 +15,6 @@ import (
"github.com/hohn/mrvacommander/pkg/queue" "github.com/hohn/mrvacommander/pkg/queue"
"github.com/hohn/mrvacommander/utils" "github.com/hohn/mrvacommander/utils"
"github.com/elastic/go-sysinfo"
"github.com/google/uuid" "github.com/google/uuid"
) )
@@ -51,41 +48,9 @@ func (r *RunnerSingle) worker(wid int) {
*/ */
const ( const (
workerMemoryMB = 2048 // 2 GB workerMemoryMB = 2048 // 2 GB
monitorIntervalSec = 10 // Monitor every 10 seconds
) )
func calculateWorkers() int {
host, err := sysinfo.Host()
if err != nil {
slog.Error("failed to get host info", "error", err)
os.Exit(1)
}
memInfo, err := host.Memory()
if err != nil {
slog.Error("failed to get memory info", "error", err)
os.Exit(1)
}
// Get available memory in MB
totalMemoryMB := memInfo.Available / (1024 * 1024)
// Ensure we have at least one worker
workers := int(totalMemoryMB / workerMemoryMB)
if workers < 1 {
workers = 1
}
// Limit the number of workers to the number of CPUs
cpuCount := runtime.NumCPU()
if workers > cpuCount {
workers = max(cpuCount, 1)
}
return workers
}
func StartAndMonitorWorkers(ctx context.Context, func StartAndMonitorWorkers(ctx context.Context,
artifacts artifactstore.Store, artifacts artifactstore.Store,
databases qldbstore.Store, databases qldbstore.Store,
@@ -93,57 +58,29 @@ func StartAndMonitorWorkers(ctx context.Context,
desiredWorkerCount int, desiredWorkerCount int,
wg *sync.WaitGroup) { wg *sync.WaitGroup) {
currentWorkerCount := 0 var workerCount int
stopChans := make([]chan struct{}, 0) if desiredWorkerCount > 0 {
workerCount = desiredWorkerCount
if desiredWorkerCount != 0 { slog.Info("Starting fixed number of workers", slog.Int("count", workerCount))
slog.Info("Starting workers", slog.Int("count", desiredWorkerCount)) } else {
for i := 0; i < desiredWorkerCount; i++ { workerCount = 1
stopChan := make(chan struct{}) slog.Info("Starting preset number of workers", slog.Int("count", workerCount))
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
return
} }
slog.Info("Worker count not specified, managing based on available memory and CPU") stopChans := make([]chan struct{}, workerCount)
for { for i := 0; i < workerCount; i++ {
select { stopChan := make(chan struct{})
case <-ctx.Done(): stopChans[i] = stopChan
// signal all workers to stop wg.Add(1)
for _, stopChan := range stopChans { go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
close(stopChan) }
}
return
default:
newWorkerCount := calculateWorkers()
if newWorkerCount != currentWorkerCount { // Wait for context cancellation
slog.Info( <-ctx.Done()
"Modifying worker count",
slog.Int("current", currentWorkerCount),
slog.Int("new", newWorkerCount))
}
if newWorkerCount > currentWorkerCount { for _, stopChan := range stopChans {
for i := currentWorkerCount; i < newWorkerCount; i++ { close(stopChan)
stopChan := make(chan struct{})
stopChans = append(stopChans, stopChan)
wg.Add(1)
go RunWorker(ctx, artifacts, databases, queue, stopChan, wg)
}
} else if newWorkerCount < currentWorkerCount {
for i := newWorkerCount; i < currentWorkerCount; i++ {
close(stopChans[i])
}
stopChans = stopChans[:newWorkerCount]
}
currentWorkerCount = newWorkerCount
time.Sleep(monitorIntervalSec * time.Second)
}
} }
} }