diff --git a/cmd/agent/main.go b/cmd/agent/main.go deleted file mode 100644 index 3666773..0000000 --- a/cmd/agent/main.go +++ /dev/null @@ -1,73 +0,0 @@ -package main - -import ( - "context" - "flag" - "log" - "log/slog" - "os" - "os/signal" - "sync" - "syscall" - - "github.com/hohn/mrvacommander/pkg/agent" - "github.com/hohn/mrvacommander/pkg/deploy" -) - -func main() { - slog.Info("Starting agent") - workerCount := flag.Int("workers", 1, "number of workers") - logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") - flag.Parse() - - // Apply 'loglevel' flag - switch *logLevel { - case "debug": - slog.SetLogLoggerLevel(slog.LevelDebug) - case "info": - slog.SetLogLoggerLevel(slog.LevelInfo) - case "warn": - slog.SetLogLoggerLevel(slog.LevelWarn) - case "error": - slog.SetLogLoggerLevel(slog.LevelError) - default: - log.Printf("Invalid logging verbosity level: %s", *logLevel) - os.Exit(1) - } - - isAgent := true - - rabbitMQQueue, err := deploy.InitRabbitMQ(isAgent) - if err != nil { - slog.Error("Failed to initialize RabbitMQ", slog.Any("error", err)) - os.Exit(1) - } - defer rabbitMQQueue.Close() - - artifacts, err := deploy.InitMinIOArtifactStore() - if err != nil { - slog.Error("Failed to initialize artifact store", slog.Any("error", err)) - os.Exit(1) - } - - databases, err := deploy.InitMinIOCodeQLDatabaseStore() - if err != nil { - slog.Error("Failed to initialize database store", slog.Any("error", err)) - os.Exit(1) - } - - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - go agent.StartAndMonitorWorkers(ctx, artifacts, databases, rabbitMQQueue, *workerCount, &wg) - slog.Info("Agent started") - - // Gracefully exit on SIGINT/SIGTERM - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan - - slog.Info("Shutting down agent") - cancel() - wg.Wait() - slog.Info("Agent shutdown complete") -} diff --git a/cmd/server/main.go b/cmd/server/main.go deleted file mode 100644 index 42f5b68..0000000 --- a/cmd/server/main.go +++ /dev/null @@ -1,158 +0,0 @@ -// Copyright © 2024 github -// Licensed under the Apache License, Version 2.0 (the "License"). - -package main - -import ( - "context" - "flag" - "log" - "log/slog" - "os" - "os/signal" - "path/filepath" - "sync" - "syscall" - - "github.com/hohn/mrvacommander/config/mcc" - - "github.com/hohn/mrvacommander/pkg/agent" - "github.com/hohn/mrvacommander/pkg/artifactstore" - "github.com/hohn/mrvacommander/pkg/deploy" - "github.com/hohn/mrvacommander/pkg/qldbstore" - "github.com/hohn/mrvacommander/pkg/queue" - "github.com/hohn/mrvacommander/pkg/server" - "github.com/hohn/mrvacommander/pkg/state" -) - -func main() { - // Define flags - helpFlag := flag.Bool("help", false, "Display help message") - logLevel := flag.String("loglevel", "info", "Set log level: debug, info, warn, error") - mode := flag.String("mode", "standalone", "Set mode: standalone, container, cluster") - dbPathRoot := flag.String("dbpath", "", "Set the root path for the database store if using standalone mode.") - - // Custom usage function for the help flag - flag.Usage = func() { - log.Printf("Usage of %s:\n", os.Args[0]) - flag.PrintDefaults() - log.Println("\nExamples:") - log.Println("go run main.go --loglevel=debug --mode=container --dbpath=/path/to/db_dir") - } - - // Parse the flags - flag.Parse() - - // Handle the help flag - if *helpFlag { - flag.Usage() - return - } - - // Apply 'loglevel' flag - switch *logLevel { - case "debug": - slog.SetLogLoggerLevel(slog.LevelDebug) - case "info": - slog.SetLogLoggerLevel(slog.LevelInfo) - case "warn": - slog.SetLogLoggerLevel(slog.LevelWarn) - case "error": - slog.SetLogLoggerLevel(slog.LevelError) - default: - log.Printf("Invalid logging verbosity level: %s", *logLevel) - os.Exit(1) - } - - // Process database root if standalone and not provided - if *mode == "standalone" && *dbPathRoot == "" { - slog.Warn("No database root path provided.") - // Current directory of the Executable has a codeql directory. There. - // Resolve the absolute directory based on os.Executable() - execPath, err := os.Executable() - if err != nil { - slog.Error("Failed to get executable path", slog.Any("error", err)) - os.Exit(1) - } - *dbPathRoot = filepath.Dir(execPath) + "/codeql/dbs/" - slog.Info("Using default database root path", "dbPathRoot", *dbPathRoot) - } - - // Read configuration - config := mcc.LoadConfig("mcconfig.toml") - - // Output configuration summary - log.Printf("Help: %t\n", *helpFlag) - log.Printf("Log Level: %s\n", *logLevel) - log.Printf("Mode: %s\n", *mode) - - // Handle signals - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - // Apply 'mode' flag - switch *mode { - case "standalone": - // Assemble single-process version - sq := queue.NewQueueSingle(2) - ss := state.NewLocalState(config.Storage.StartingID) - as := artifactstore.NewInMemoryArtifactStore() - ql := qldbstore.NewLocalFilesystemCodeQLDatabaseStore(*dbPathRoot) - - server.NewCommanderSingle(&server.Visibles{ - Queue: sq, - State: ss, - Artifacts: as, - CodeQLDBStore: ql, - }) - - var wg sync.WaitGroup - ctx, cancel := context.WithCancel(context.Background()) - - go agent.StartAndMonitorWorkers(ctx, as, ql, sq, 2, &wg) - - slog.Info("Started server and standalone agent") - <-sigChan - slog.Info("Shutting down...") - cancel() - wg.Wait() - slog.Info("Agent shutdown complete") - - case "container": - isAgent := false - - rabbitMQQueue, err := deploy.InitRabbitMQ(isAgent) - if err != nil { - slog.Error("Failed to initialize RabbitMQ", slog.Any("error", err)) - os.Exit(1) - } - defer rabbitMQQueue.Close() - - artifacts, err := deploy.InitMinIOArtifactStore() - if err != nil { - slog.Error("Failed to initialize artifact store", slog.Any("error", err)) - os.Exit(1) - } - - databases, err := deploy.InitMinIOCodeQLDatabaseStore() - if err != nil { - slog.Error("Failed to initialize database store", slog.Any("error", err)) - os.Exit(1) - } - - server.NewCommanderSingle(&server.Visibles{ - Queue: rabbitMQQueue, - State: state.NewLocalState(config.Storage.StartingID), - Artifacts: artifacts, - CodeQLDBStore: databases, - }) - - slog.Info("Started server in container mode.") - <-sigChan - default: - slog.Error("Invalid value for --mode. Allowed values are: standalone, container, cluster") - os.Exit(1) - } - - slog.Info("Server shutdown complete") -} diff --git a/pkg/deploy/init.go b/pkg/deploy/init.go index 9d6a309..c69cac2 100644 --- a/pkg/deploy/init.go +++ b/pkg/deploy/init.go @@ -102,51 +102,6 @@ func InitMinIOArtifactStore() (artifactstore.Store, error) { return store, nil } - -func InitMinIOCodeQLDatabaseStore() (qldbstore.Store, error) { - requiredEnvVars := []string{ - "QLDB_MINIO_ENDPOINT", - "QLDB_MINIO_ID", - "QLDB_MINIO_SECRET", - "MRVA_MINIO_VIRTUAL_HOST", - } - validateEnvVars(requiredEnvVars) - - endpoint := os.Getenv("QLDB_MINIO_ENDPOINT") - id := os.Getenv("QLDB_MINIO_ID") - secret := os.Getenv("QLDB_MINIO_SECRET") - useVirtual := os.Getenv("MRVA_MINIO_VIRTUAL_HOST") == "1" - - var lookup minio.BucketLookupType - var bucketName string - - if useVirtual { - parsedURL, err := url.Parse(endpoint) - if err != nil { - return nil, fmt.Errorf("failed to parse QLDB_MINIO_ENDPOINT: %w", err) - } - hostParts := strings.Split(parsedURL.Hostname(), ".") - if len(hostParts) < 2 { - return nil, fmt.Errorf("unable to extract bucket from host: %s", parsedURL.Hostname()) - } - bucketName = hostParts[0] - lookup = minio.BucketLookupDNS - } else { - bucketName = "mrvabucket" - lookup = minio.BucketLookupPath - } - - // TODO: unify into one. clean up state handling. - qldbstore.QL_DB_BUCKETNAME = bucketName - - store, err := qldbstore.NewMinIOCodeQLDatabaseStore(endpoint, id, secret, lookup) - if err != nil { - return nil, fmt.Errorf("failed to initialize ql database storage: %v", err) - } - - return store, nil -} - func InitHEPCDatabaseStore() (qldbstore.Store, error) { requiredEnvVars := []string{ "MRVA_HEPC_ENDPOINT", diff --git a/pkg/deploy/sighelp.go b/pkg/deploy/sighelp.go deleted file mode 100644 index ffaeefa..0000000 --- a/pkg/deploy/sighelp.go +++ /dev/null @@ -1,57 +0,0 @@ -package deploy - -// gpt:summary: semantic outline of init.go functions and their primary responsibilities -// gpt:note: this file provides GPT-visible symbolic structure for deploy/init.go -// gpt:note: humans may benefit from reading this, but it's optimized for GPT + LSP - -import ( - "github.com/hohn/mrvacommander/pkg/artifactstore" - "github.com/hohn/mrvacommander/pkg/qldbstore" - "github.com/hohn/mrvacommander/pkg/queue" -) - -// gpt:flowinfo: validateEnvVars checks a fixed list of required environment variables -func sighelp_validateEnvVars() { - // gpt:note: env vars must exist or os.Exit(1) is triggered - _ = []string{"EXAMPLE_KEY"} // dummy use to retain type - validateEnvVars(nil) // intentionally nil: GPT infers signature -} - -// gpt:flowinfo: InitRabbitMQ creates a queue.Queue using RabbitMQ connection info -func sighelp_InitRabbitMQ() { - // gpt:note: requires 4 env vars: HOST, PORT, USER, PASSWORD - // gpt:returns: queue.Queue, error - var q queue.Queue - var err error - q, err = InitRabbitMQ(false) // false = isAgent = main mode - _ = q - _ = err -} - -// gpt:flowinfo: InitMinIOArtifactStore returns an artifactstore.Store from env config -func sighelp_InitMinIOArtifactStore() { - var s artifactstore.Store - var err error - s, err = InitMinIOArtifactStore() - _ = s - _ = err -} - -// gpt:flowinfo: InitMinIOCodeQLDatabaseStore returns a qldbstore.Store -func sighelp_InitMinIOCodeQLDatabaseStore() { - var s qldbstore.Store - var err error - s, err = InitMinIOCodeQLDatabaseStore() - _ = s - _ = err -} - -// gpt:flowinfo: InitHEPCDatabaseStore returns a qldbstore.Store (from Hepc impl) -// gpt:note: unlike others, this directly returns from NewHepcStore with fewer checks -func sighelp_InitHEPCDatabaseStore() { - var s qldbstore.Store - var err error - s, err = InitHEPCDatabaseStore() - _ = s - _ = err -} diff --git a/pkg/qldbstore/qldbstore_minio.go b/pkg/qldbstore/qldbstore_minio.go deleted file mode 100644 index 3274217..0000000 --- a/pkg/qldbstore/qldbstore_minio.go +++ /dev/null @@ -1,103 +0,0 @@ -package qldbstore - -import ( - "context" - "fmt" - "io" - "log/slog" - - "github.com/hohn/mrvacommander/pkg/common" - - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" -) - -// XX: static types: split by type? -// Restrict the keys / values and centralize the common ones here -var ( - QL_DB_BUCKETNAME = "mrvabucket" -) - -type MinIOCodeQLDatabaseStore struct { - client *minio.Client - bucketName string -} - -func NewMinIOCodeQLDatabaseStore(endpoint, id, secret string, - lookup minio.BucketLookupType) (*MinIOCodeQLDatabaseStore, error) { - - minioClient, err := minio.New(endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(id, secret, ""), - Secure: false, - BucketLookup: lookup, - }) - if err != nil { - return nil, err - } - - slog.Info("Connected to MinIO CodeQL database store server") - - err = common.CreateMinIOBucketIfNotExists(minioClient, QL_DB_BUCKETNAME) - if err != nil { - return nil, fmt.Errorf("could not create bucket: %v", err) - } - - return &MinIOCodeQLDatabaseStore{ - client: minioClient, - bucketName: QL_DB_BUCKETNAME, - }, nil -} - -func (store *MinIOCodeQLDatabaseStore) FindAvailableDBs(analysisReposRequested []common.NameWithOwner) ( - notFoundRepos []common.NameWithOwner, - foundRepos []common.NameWithOwner) { - - for _, repo := range analysisReposRequested { - status := store.haveDatabase(repo) - if status { - foundRepos = append(foundRepos, repo) - } else { - notFoundRepos = append(notFoundRepos, repo) - } - } - - return notFoundRepos, foundRepos -} - -func (store *MinIOCodeQLDatabaseStore) GetDatabase(location common.NameWithOwner) ([]byte, error) { - key := fmt.Sprintf("%s$%s.zip", location.Owner, location.Repo) - object, err := store.client.GetObject(context.Background(), - store.bucketName, - key, - minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - defer object.Close() - - data, err := io.ReadAll(object) - if err != nil { - return nil, err - } - - return data, nil -} - -func (store *MinIOCodeQLDatabaseStore) haveDatabase(location common.NameWithOwner) bool { - objectName := fmt.Sprintf("%s$%s.zip", location.Owner, location.Repo) - - // Check if the object exists - _, err := store.client.StatObject(context.Background(), - store.bucketName, - objectName, - minio.StatObjectOptions{}) - if err != nil { - if minio.ToErrorResponse(err).Code == "NoSuchKey" { - slog.Info("No database found for", location) - return false - } - slog.Info("General database error while checking for", location) - return false - } - return true -} diff --git a/pkg/queue/queue_rabbitmq.go b/pkg/queue/queue_rabbitmq.go index 6f57956..bd674cd 100644 --- a/pkg/queue/queue_rabbitmq.go +++ b/pkg/queue/queue_rabbitmq.go @@ -126,33 +126,44 @@ func (q *RabbitMQQueue) Close() { } func (q *RabbitMQQueue) ConsumeJobs(queueName string) { - autoAck := false - msgs, err := q.channel.Consume(queueName, "", autoAck, false, false, false, nil) + const pollInterval = 5 * time.Second - if err != nil { - slog.Error("failed to consume from queue", slog.Any("error", err)) - } + // | scenario | result | + // |-------------------+---------------------------------------| + // | Queue is empty | msg = zero, ok = false, err = nil | + // | Queue has message | msg = valid, ok = true, err = nil | + // | Connection lost | msg = zero, ok = false, err = non-nil | - for msg := range msgs { - // Process message - job := AnalyzeJob{} - err := json.Unmarshal(msg.Body, &job) + for { + msg, ok, err := q.channel.Get(queueName, false) // false = manual ack if err != nil { - slog.Error("failed to unmarshal job", slog.Any("error", err)) + slog.Error("polling error while getting job", slog.Any("error", err)) + time.Sleep(pollInterval) continue } + + if !ok { + // No message in queue + time.Sleep(pollInterval) + continue + } + + var job AnalyzeJob + if err := json.Unmarshal(msg.Body, &job); err != nil { + slog.Error("failed to unmarshal job", slog.Any("error", err)) + _ = msg.Nack(false, false) // do not requeue + continue + } + + // Send job to channel for processing q.jobs <- job - // Acknowledge the message after successful processing - err = msg.Ack(false) - if err != nil { - slog.Error("Failed to acknowledge job consumption message", - slog.Any("error", err)) + // Acknowledge successful processing + if err := msg.Ack(false); err != nil { + slog.Error("failed to ack job message", slog.Any("error", err)) continue } - } - close(q.jobs) } func (q *RabbitMQQueue) PublishResults(queueName string) { @@ -247,30 +258,31 @@ func (q *RabbitMQQueue) PublishJobs(queueName string) { } func (q *RabbitMQQueue) ConsumeResults(queueName string) { - autoAck := false - msgs, err := q.channel.Consume(queueName, "", autoAck, false, false, false, nil) - if err != nil { - slog.Error("failed to register a consumer", slog.Any("error", err)) - } + autoAck := false // false = manual ack + sleepFor := 5 // polling interval - for msg := range msgs { - // Process message - result := AnalyzeResult{} - err := json.Unmarshal(msg.Body, &result) + for { + msg, ok, err := q.channel.Get(queueName, autoAck) if err != nil { - slog.Error("failed to unmarshal result", slog.Any("error", err)) + slog.Error("poll error", slog.Any("err", err)) + time.Sleep(time.Duration(sleepFor) * time.Second) continue } + if !ok { + // no message + time.Sleep(time.Duration(sleepFor) * time.Second) + continue + } + + var result AnalyzeResult + if err := json.Unmarshal(msg.Body, &result); err != nil { + slog.Error("unmarshal error", slog.Any("err", err)) + _ = msg.Nack(false, false) // finish .Get() with nack + continue + } + q.results <- result - - // Acknowledge the message after successful processing - err = msg.Ack(false) - if err != nil { - slog.Error("Failed to acknowledge result consumption message", - slog.Any("error", err)) - continue - } - + _ = msg.Ack(false) // finish .Get() with nack } - close(q.results) + }