diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 7cd3e33..fa718ce 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -103,6 +103,7 @@ func startAndMonitorWorkers(ctx context.Context, queue queue.Queue, desiredWorke stopChans = stopChans[:newWorkerCount] } currentWorkerCount = newWorkerCount + time.Sleep(monitorIntervalSec * time.Second) } } @@ -125,7 +126,7 @@ func main() { for _, envVar := range requiredEnvVars { if _, ok := os.LookupEnv(envVar); !ok { - slog.Error("Missing required environment variable %s", envVar) + slog.Error("Missing required environment variable", "key", envVar) os.Exit(1) } } @@ -143,7 +144,7 @@ func main() { slog.Info("Initializing RabbitMQ queue") - rabbitMQQueue, err := queue.InitializeRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass) + rabbitMQQueue, err := queue.InitializeRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass, false) if err != nil { slog.Error("failed to initialize RabbitMQ", slog.Any("error", err)) os.Exit(1) @@ -160,7 +161,6 @@ func main() { // Gracefully exit on SIGINT/SIGTERM sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - <-sigChan slog.Info("Shutting down agent") diff --git a/pkg/queue/rabbitmq.go b/pkg/queue/rabbitmq.go index f0047f3..23716e9 100644 --- a/pkg/queue/rabbitmq.go +++ b/pkg/queue/rabbitmq.go @@ -21,11 +21,19 @@ type RabbitMQQueue struct { channel *amqp.Channel } +// InitializeRabbitMQQueue initializes a RabbitMQ queue. +// It returns a pointer to a RabbitMQQueue and an error. +// +// If isAgent is true, the queue is initialized to be used by an agent. +// Otherwise, the queue is initialized to be used by the server. +// The difference in behaviour is that the agent consumes jobs and publishes results, +// while the server publishes jobs and consumes results. func InitializeRabbitMQQueue( host string, port int16, user string, password string, + isAgent bool, ) (*RabbitMQQueue, error) { const ( tryCount = 5 @@ -90,11 +98,19 @@ func InitializeRabbitMQQueue( results: make(chan common.AnalyzeResult), } - slog.Info("Starting tasks consumer") - go result.ConsumeJobs(jobsQueueName) + if isAgent { + slog.Info("Starting tasks consumer") + go result.ConsumeJobs(jobsQueueName) - slog.Info("Starting results publisher") - go result.PublishResults(resultsQueueName) + slog.Info("Starting results publisher") + go result.PublishResults(resultsQueueName) + } else { + slog.Info("Starting jobs publisher") + go result.PublishJobs(jobsQueueName) + + slog.Info("Starting results consumer") + go result.ConsumeResults(resultsQueueName) + } return &result, nil } @@ -141,7 +157,7 @@ func (q *RabbitMQQueue) PublishResults(queueName string) { } } -func (q *RabbitMQQueue) publishResult(queueName string, result interface{}) { +func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeResult) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -161,3 +177,48 @@ func (q *RabbitMQQueue) publishResult(queueName string, result interface{}) { slog.Error("failed to publish result", slog.Any("error", err)) } } + +func (q *RabbitMQQueue) publishJob(queueName string, job common.AnalyzeJob) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + jobBytes, err := json.Marshal(job) + if err != nil { + slog.Error("failed to marshal job", slog.Any("error", err)) + return + } + + slog.Debug("Publishing job", slog.String("job", string(jobBytes))) + err = q.channel.PublishWithContext(ctx, "", queueName, false, false, + amqp.Publishing{ + ContentType: "application/json", + Body: jobBytes, + }) + if err != nil { + slog.Error("failed to publish job", slog.Any("error", err)) + } +} + +func (q *RabbitMQQueue) PublishJobs(queueName string) { + for job := range q.jobs { + q.publishJob(queueName, job) + } +} + +func (q *RabbitMQQueue) ConsumeResults(queueName string) { + msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil) + if err != nil { + slog.Error("failed to register a consumer", slog.Any("error", err)) + } + + for msg := range msgs { + result := common.AnalyzeResult{} + err := json.Unmarshal(msg.Body, &result) + if err != nil { + slog.Error("failed to unmarshal result", slog.Any("error", err)) + continue + } + q.results <- result + } + close(q.results) +}