Files
mrvacommander/pkg/queue/rabbitmq.go
2024-06-17 11:30:46 +02:00

225 lines
5.5 KiB
Go

package queue
import (
"mrvacommander/pkg/common"
"mrvacommander/pkg/storage"
"context"
"encoding/json"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/exp/slog"
)
type RabbitMQQueue struct {
jobs chan common.AnalyzeJob
results chan common.AnalyzeResult
conn *amqp.Connection
channel *amqp.Channel
}
// InitializeRabbitMQQueue initializes a RabbitMQ queue.
// It returns a pointer to a RabbitMQQueue and an error.
//
// If isAgent is true, the queue is initialized to be used by an agent.
// Otherwise, the queue is initialized to be used by the server.
// The difference in behaviour is that the agent consumes jobs and publishes results,
// while the server publishes jobs and consumes results.
func InitializeRabbitMQQueue(
host string,
port int16,
user string,
password string,
isAgent bool,
) (*RabbitMQQueue, error) {
const (
tryCount = 5
retryDelaySec = 3
jobsQueueName = "tasks"
resultsQueueName = "results"
)
var conn *amqp.Connection
var err error
rabbitMQURL := fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)
for i := 0; i < tryCount; i++ {
slog.Info("Attempting to connect to RabbitMQ", slog.Int("attempt", i+1))
conn, err = amqp.Dial(rabbitMQURL)
if err != nil {
slog.Warn("Failed to connect to RabbitMQ", "error", err)
if i < tryCount-1 {
slog.Info("Retrying", "seconds", retryDelaySec)
time.Sleep(retryDelaySec * time.Second)
}
} else {
// successfully connected to RabbitMQ
break
}
}
if err != nil {
return nil, fmt.Errorf("failed to connect: %w", err)
}
slog.Info("Connected to RabbitMQ")
ch, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open a channel: %w", err)
}
_, err = ch.QueueDeclare(jobsQueueName, false, false, false, true, nil)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to declare tasks queue: %w", err)
}
_, err = ch.QueueDeclare(resultsQueueName, false, false, false, true, nil)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to declare results queue: %w", err)
}
err = ch.Qos(1, 0, false)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to set QoS: %w", err)
}
result := RabbitMQQueue{
conn: conn,
channel: ch,
jobs: make(chan common.AnalyzeJob),
results: make(chan common.AnalyzeResult),
}
if isAgent {
slog.Info("Starting tasks consumer")
go result.ConsumeJobs(jobsQueueName)
slog.Info("Starting results publisher")
go result.PublishResults(resultsQueueName)
} else {
slog.Info("Starting jobs publisher")
go result.PublishJobs(jobsQueueName)
slog.Info("Starting results consumer")
go result.ConsumeResults(resultsQueueName)
}
return &result, nil
}
func (q *RabbitMQQueue) Jobs() chan common.AnalyzeJob {
return q.jobs
}
func (q *RabbitMQQueue) Results() chan common.AnalyzeResult {
return q.results
}
func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) {
// TODO: Implement
log.Fatal("unimplemented")
}
func (q *RabbitMQQueue) Close() {
q.channel.Close()
q.conn.Close()
}
func (q *RabbitMQQueue) ConsumeJobs(queueName string) {
msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil)
if err != nil {
slog.Error("failed to register a consumer", slog.Any("error", err))
}
for msg := range msgs {
job := common.AnalyzeJob{}
err := json.Unmarshal(msg.Body, &job)
if err != nil {
slog.Error("failed to unmarshal job", slog.Any("error", err))
continue
}
q.jobs <- job
}
close(q.jobs)
}
func (q *RabbitMQQueue) PublishResults(queueName string) {
for result := range q.results {
q.publishResult(queueName, result)
}
}
func (q *RabbitMQQueue) publishResult(queueName string, result common.AnalyzeResult) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resultBytes, err := json.Marshal(result)
if err != nil {
slog.Error("failed to marshal result", slog.Any("error", err))
return
}
slog.Debug("Publishing result", slog.String("result", string(resultBytes)))
err = q.channel.PublishWithContext(ctx, "", queueName, false, false,
amqp.Publishing{
ContentType: "application/json",
Body: resultBytes,
})
if err != nil {
slog.Error("failed to publish result", slog.Any("error", err))
}
}
func (q *RabbitMQQueue) publishJob(queueName string, job common.AnalyzeJob) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
jobBytes, err := json.Marshal(job)
if err != nil {
slog.Error("failed to marshal job", slog.Any("error", err))
return
}
slog.Debug("Publishing job", slog.String("job", string(jobBytes)))
err = q.channel.PublishWithContext(ctx, "", queueName, false, false,
amqp.Publishing{
ContentType: "application/json",
Body: jobBytes,
})
if err != nil {
slog.Error("failed to publish job", slog.Any("error", err))
}
}
func (q *RabbitMQQueue) PublishJobs(queueName string) {
for job := range q.jobs {
q.publishJob(queueName, job)
}
}
func (q *RabbitMQQueue) ConsumeResults(queueName string) {
msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil)
if err != nil {
slog.Error("failed to register a consumer", slog.Any("error", err))
}
for msg := range msgs {
result := common.AnalyzeResult{}
err := json.Unmarshal(msg.Body, &result)
if err != nil {
slog.Error("failed to unmarshal result", slog.Any("error", err))
continue
}
q.results <- result
}
close(q.results)
}