Separate queue and agent logic and refactor
This commit is contained in:
@@ -1,73 +1,41 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"mrvacommander/pkg/agent"
|
||||||
"log"
|
|
||||||
"mrvacommander/pkg/codeql"
|
|
||||||
"mrvacommander/pkg/common"
|
|
||||||
"mrvacommander/pkg/queue"
|
"mrvacommander/pkg/queue"
|
||||||
"mrvacommander/pkg/storage"
|
|
||||||
"mrvacommander/utils"
|
|
||||||
"net/http"
|
|
||||||
"path/filepath"
|
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"sync"
|
"strconv"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"flag"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
"os"
|
||||||
"golang.org/x/exp/slog"
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/elastic/go-sysinfo"
|
"github.com/elastic/go-sysinfo"
|
||||||
|
"golang.org/x/exp/slog"
|
||||||
)
|
)
|
||||||
|
|
||||||
func downloadFile(url string, dest string) error {
|
|
||||||
resp, err := http.Get(url)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to download file: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
out, err := os.Create(dest)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create file: %w", err)
|
|
||||||
}
|
|
||||||
defer out.Close()
|
|
||||||
|
|
||||||
_, err = io.Copy(out, resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to copy file content: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculateWorkers() int {
|
func calculateWorkers() int {
|
||||||
const workerMemoryGB = 2
|
const workerMemoryMB = 2048 // 2 GB
|
||||||
|
|
||||||
host, err := sysinfo.Host()
|
host, err := sysinfo.Host()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get host info: %v", err)
|
slog.Error("failed to get host info", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
memInfo, err := host.Memory()
|
memInfo, err := host.Memory()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get memory info: %v", err)
|
slog.Error("failed to get memory info", "error", err)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert total memory to GB
|
// Get available memory in MB
|
||||||
totalMemoryGB := memInfo.Available / (1024 * 1024 * 1024)
|
totalMemoryMB := memInfo.Available / (1024 * 1024)
|
||||||
|
|
||||||
// Ensure we have at least one worker
|
// Ensure we have at least one worker
|
||||||
workers := int(totalMemoryGB / workerMemoryGB)
|
workers := int(totalMemoryMB / workerMemoryMB)
|
||||||
if workers < 1 {
|
if workers < 1 {
|
||||||
workers = 1
|
workers = 1
|
||||||
}
|
}
|
||||||
@@ -81,206 +49,6 @@ func calculateWorkers() int {
|
|||||||
return workers
|
return workers
|
||||||
}
|
}
|
||||||
|
|
||||||
type RabbitMQQueue struct {
|
|
||||||
jobs chan common.AnalyzeJob
|
|
||||||
results chan common.AnalyzeResult
|
|
||||||
conn *amqp.Connection
|
|
||||||
channel *amqp.Channel
|
|
||||||
}
|
|
||||||
|
|
||||||
func InitializeQueue(jobsQueueName, resultsQueueName string) (*RabbitMQQueue, error) {
|
|
||||||
rabbitMQHost := os.Getenv("MRVA_RABBITMQ_HOST")
|
|
||||||
rabbitMQPort := os.Getenv("MRVA_RABBITMQ_PORT")
|
|
||||||
rabbitMQUser := os.Getenv("MRVA_RABBITMQ_USER")
|
|
||||||
rabbitMQPassword := os.Getenv("MRVA_RABBITMQ_PASSWORD")
|
|
||||||
|
|
||||||
if rabbitMQHost == "" || rabbitMQPort == "" || rabbitMQUser == "" || rabbitMQPassword == "" {
|
|
||||||
return nil, fmt.Errorf("RabbitMQ environment variables not set")
|
|
||||||
}
|
|
||||||
|
|
||||||
rabbitMQURL := fmt.Sprintf("amqp://%s:%s@%s:%s/", rabbitMQUser, rabbitMQPassword, rabbitMQHost, rabbitMQPort)
|
|
||||||
|
|
||||||
const (
|
|
||||||
tryCount = 5
|
|
||||||
retryDelaySec = 3
|
|
||||||
)
|
|
||||||
|
|
||||||
var conn *amqp.Connection
|
|
||||||
var err error
|
|
||||||
|
|
||||||
for i := 0; i < tryCount; i++ {
|
|
||||||
slog.Info("Attempting to connect to RabbitMQ", slog.Int("attempt", i+1))
|
|
||||||
conn, err = amqp.Dial(rabbitMQURL)
|
|
||||||
if err != nil {
|
|
||||||
slog.Warn("Failed to connect to RabbitMQ: %w", err)
|
|
||||||
if i < tryCount-1 {
|
|
||||||
slog.Info("Retrying in %d seconds", retryDelaySec)
|
|
||||||
time.Sleep(retryDelaySec * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("Connected to RabbitMQ")
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil, fmt.Errorf("failed to open a channel: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = ch.QueueDeclare(jobsQueueName, false, false, false, true, nil)
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil, fmt.Errorf("failed to declare tasks queue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = ch.QueueDeclare(resultsQueueName, false, false, false, true, nil)
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil, fmt.Errorf("failed to declare results queue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ch.Qos(1, 0, false)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
conn.Close()
|
|
||||||
return nil, fmt.Errorf("failed to set QoS: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &RabbitMQQueue{
|
|
||||||
conn: conn,
|
|
||||||
channel: ch,
|
|
||||||
jobs: make(chan common.AnalyzeJob),
|
|
||||||
results: make(chan common.AnalyzeResult),
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) Jobs() chan common.AnalyzeJob {
|
|
||||||
return q.jobs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) Results() chan common.AnalyzeResult {
|
|
||||||
return q.results
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) {
|
|
||||||
slog.Info("Queueing codeql database analyze jobs")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) Close() {
|
|
||||||
q.channel.Close()
|
|
||||||
q.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) ConsumeJobs(queueName string) {
|
|
||||||
msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("failed to register a consumer", slog.Any("error", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
for msg := range msgs {
|
|
||||||
job := common.AnalyzeJob{}
|
|
||||||
err := json.Unmarshal(msg.Body, &job)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("failed to unmarshal job", slog.Any("error", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
q.jobs <- job
|
|
||||||
}
|
|
||||||
close(q.jobs)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) PublishResults(queueName string) {
|
|
||||||
for result := range q.results {
|
|
||||||
q.publishResult(queueName, result)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RabbitMQQueue) publishResult(queueName string, result interface{}) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
resultBytes, err := json.Marshal(result)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("failed to marshal result", slog.Any("error", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
slog.Info("Publishing result", slog.String("result", string(resultBytes)))
|
|
||||||
err = q.channel.PublishWithContext(ctx, "", queueName, false, false,
|
|
||||||
amqp.Publishing{
|
|
||||||
ContentType: "application/json",
|
|
||||||
Body: resultBytes,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("failed to publish result", slog.Any("error", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
|
|
||||||
var result = common.AnalyzeResult{
|
|
||||||
RequestId: job.RequestId,
|
|
||||||
ResultCount: 0,
|
|
||||||
ResultArchiveURL: "",
|
|
||||||
Status: common.StatusError,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log job info
|
|
||||||
slog.Info("Running analysis job", slog.Any("job", job))
|
|
||||||
|
|
||||||
// Create a temporary directory
|
|
||||||
tempDir := filepath.Join(os.TempDir(), uuid.New().String())
|
|
||||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
|
||||||
return result, fmt.Errorf("failed to create temporary directory: %v", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(tempDir)
|
|
||||||
|
|
||||||
// Extract the query pack
|
|
||||||
// TODO: download from the 'job' query pack URL
|
|
||||||
utils.UntarGz("qp-54674.tgz", filepath.Join(tempDir, "qp-54674"))
|
|
||||||
|
|
||||||
// Perform the CodeQL analysis
|
|
||||||
runResult, err := codeql.RunQuery("google_flatbuffers_db.zip", "cpp", "qp-54674", tempDir)
|
|
||||||
if err != nil {
|
|
||||||
return result, fmt.Errorf("failed to run analysis: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate a ZIP archive containing SARIF and BQRS files
|
|
||||||
resultsArchive, err := codeql.GenerateResultsZipArchive(runResult)
|
|
||||||
if err != nil {
|
|
||||||
return result, fmt.Errorf("failed to generate results archive: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Upload the archive to storage
|
|
||||||
slog.Info("Results archive size", slog.Int("size", len(resultsArchive)))
|
|
||||||
slog.Info("Analysis job successful.")
|
|
||||||
|
|
||||||
result = common.AnalyzeResult{
|
|
||||||
RequestId: job.RequestId,
|
|
||||||
ResultCount: runResult.ResultCount,
|
|
||||||
ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE",
|
|
||||||
Status: common.StatusSuccess,
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func RunWorker(queue queue.Queue, wg *sync.WaitGroup) {
|
|
||||||
defer wg.Done()
|
|
||||||
for job := range queue.Jobs() {
|
|
||||||
result, err := RunAnalysisJob(job)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("failed to run analysis job", slog.Any("error", err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
queue.Results() <- result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
slog.Info("Starting agent")
|
slog.Info("Starting agent")
|
||||||
|
|
||||||
@@ -297,13 +65,27 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, envVar := range requiredEnvVars {
|
for _, envVar := range requiredEnvVars {
|
||||||
if os.Getenv(envVar) == "" {
|
if _, ok := os.LookupEnv(envVar); !ok {
|
||||||
log.Fatalf("Fatal: Missing required environment variable %s", envVar)
|
slog.Error("Missing required environment variable %s", envVar)
|
||||||
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("Initializing RabbitMQ connection")
|
rmqHost := os.Getenv("MRVA_RABBITMQ_HOST")
|
||||||
rabbitMQQueue, err := InitializeQueue("tasks", "results")
|
rmqPort := os.Getenv("MRVA_RABBITMQ_PORT")
|
||||||
|
rmqUser := os.Getenv("MRVA_RABBITMQ_USER")
|
||||||
|
rmqPass := os.Getenv("MRVA_RABBITMQ_PASSWORD")
|
||||||
|
|
||||||
|
rmqPortAsInt, err := strconv.Atoi(rmqPort)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Failed to parse RabbitMQ port", slog.Any("error", err))
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Initializing RabbitMQ queue")
|
||||||
|
|
||||||
|
rabbitMQQueue, err := queue.InitializeRabbitMQQueue(rmqHost, int16(rmqPortAsInt), rmqUser, rmqPass)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("failed to initialize RabbitMQ", slog.Any("error", err))
|
slog.Error("failed to initialize RabbitMQ", slog.Any("error", err))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -318,21 +100,21 @@ func main() {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < *workerCount; i++ {
|
for i := 0; i < *workerCount; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go RunWorker(rabbitMQQueue, &wg)
|
go agent.RunWorker(rabbitMQQueue, &wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("Starting tasks consumer")
|
|
||||||
go rabbitMQQueue.ConsumeJobs("tasks")
|
|
||||||
|
|
||||||
slog.Info("Starting results publisher")
|
|
||||||
go rabbitMQQueue.PublishResults("results")
|
|
||||||
|
|
||||||
slog.Info("Agent startup complete")
|
slog.Info("Agent startup complete")
|
||||||
|
|
||||||
|
// Gracefully exit on SIGINT/SIGTERM (TODO: add job cleanup)
|
||||||
sigChan := make(chan os.Signal, 1)
|
sigChan := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||||
<-sigChan
|
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
<-sigChan
|
||||||
slog.Info("Shutting down agent")
|
slog.Info("Shutting down agent")
|
||||||
close(rabbitMQQueue.results)
|
rabbitMQQueue.Close()
|
||||||
|
os.Exit(0)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +1,20 @@
|
|||||||
package agent
|
package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"mrvacommander/pkg/codeql"
|
||||||
"mrvacommander/pkg/common"
|
"mrvacommander/pkg/common"
|
||||||
"mrvacommander/pkg/logger"
|
"mrvacommander/pkg/logger"
|
||||||
"mrvacommander/pkg/qpstore"
|
"mrvacommander/pkg/qpstore"
|
||||||
"mrvacommander/pkg/queue"
|
"mrvacommander/pkg/queue"
|
||||||
"mrvacommander/pkg/storage"
|
"mrvacommander/pkg/storage"
|
||||||
"mrvacommander/utils"
|
"mrvacommander/utils"
|
||||||
|
|
||||||
"log/slog"
|
|
||||||
|
|
||||||
"fmt"
|
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type RunnerSingle struct {
|
type RunnerSingle struct {
|
||||||
@@ -40,10 +40,12 @@ type Visibles struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *RunnerSingle) Setup(st *Visibles) {
|
func (c *RunnerSingle) Setup(st *Visibles) {
|
||||||
return
|
// TODO: implement
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RunnerSingle) worker(wid int) {
|
func (r *RunnerSingle) worker(wid int) {
|
||||||
|
// TODO: reimplement this later
|
||||||
|
/*
|
||||||
var job common.AnalyzeJob
|
var job common.AnalyzeJob
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -54,88 +56,85 @@ func (r *RunnerSingle) worker(wid int) {
|
|||||||
slog.Debug("Analysis: running", "job", job)
|
slog.Debug("Analysis: running", "job", job)
|
||||||
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusQueued)
|
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusQueued)
|
||||||
|
|
||||||
_, err := RunAnalysis(job)
|
resultFile, err := RunAnalysis(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Debug("Analysis run finished", "job", job)
|
slog.Debug("Analysis run finished", "job", job)
|
||||||
|
|
||||||
res := common.AnalyzeResult{}
|
// TODO: FIX THIS
|
||||||
|
res := common.AnalyzeResult{
|
||||||
|
RunAnalysisSARIF: resultFile,
|
||||||
|
RunAnalysisBQRS: "", // FIXME ?
|
||||||
|
}
|
||||||
r.queue.Results() <- res
|
r.queue.Results() <- res
|
||||||
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusSuccess)
|
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusSuccess)
|
||||||
storage.SetResult(job.QueryPackId, job.NWO, res)
|
storage.SetResult(job.QueryPackId, job.NWO, res)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunAnalysis(job common.AnalyzeJob) (string, error) {
|
// RunAnalysisJob runs a CodeQL analysis job (AnalyzeJob) returning an AnalyzeResult
|
||||||
// TODO Add multi-language tests including queryLanguage
|
func RunAnalysisJob(job common.AnalyzeJob) (common.AnalyzeResult, error) {
|
||||||
// queryPackID, queryLanguage, dbOwner, dbRepo :=
|
var result = common.AnalyzeResult{
|
||||||
// job.QueryPackId, job.QueryLanguage, job.NWO.Owner, job.NWO.Repo
|
RequestId: job.RequestId,
|
||||||
queryPackID, dbOwner, dbRepo :=
|
ResultCount: 0,
|
||||||
job.QueryPackId, job.NWO.Owner, job.NWO.Repo
|
ResultArchiveURL: "",
|
||||||
|
Status: common.StatusError,
|
||||||
serverRoot := os.Getenv("MRVA_SERVER_ROOT")
|
|
||||||
|
|
||||||
// Set up derived paths
|
|
||||||
dbPath := filepath.Join(serverRoot, "var/codeql/dbs", dbOwner, dbRepo)
|
|
||||||
dbZip := filepath.Join(serverRoot, "codeql/dbs", dbOwner, dbRepo,
|
|
||||||
fmt.Sprintf("%s_%s_db.zip", dbOwner, dbRepo))
|
|
||||||
dbExtract := filepath.Join(serverRoot, "var/codeql/dbs", dbOwner, dbRepo)
|
|
||||||
|
|
||||||
queryPack := filepath.Join(serverRoot,
|
|
||||||
"var/codeql/querypacks", fmt.Sprintf("qp-%d.tgz", queryPackID))
|
|
||||||
queryExtract := filepath.Join(serverRoot,
|
|
||||||
"var/codeql/querypacks", fmt.Sprintf("qp-%d", queryPackID))
|
|
||||||
|
|
||||||
queryOutDir := filepath.Join(serverRoot,
|
|
||||||
"var/codeql/sarif/localrun", dbOwner, dbRepo)
|
|
||||||
queryOutFile := filepath.Join(queryOutDir,
|
|
||||||
fmt.Sprintf("%s_%s.sarif", dbOwner, dbRepo))
|
|
||||||
|
|
||||||
// Prepare directory, extract database
|
|
||||||
if err := os.MkdirAll(dbExtract, 0755); err != nil {
|
|
||||||
slog.Error("Failed to create DB directory %s: %v", dbExtract, err)
|
|
||||||
return "", err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := utils.UnzipFile(dbZip, dbExtract); err != nil {
|
// Create a temporary directory
|
||||||
slog.Error("Failed to unzip DB", dbZip, err)
|
tempDir := filepath.Join(os.TempDir(), uuid.New().String())
|
||||||
return "", err
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||||
|
return result, fmt.Errorf("failed to create temporary directory: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(tempDir)
|
||||||
|
|
||||||
|
// Extract the query pack
|
||||||
|
// TODO: download from the 'job' query pack URL
|
||||||
|
// utils.downloadFile
|
||||||
|
queryPackPath := filepath.Join(tempDir, "qp-54674")
|
||||||
|
utils.UntarGz("qp-54674.tgz", queryPackPath)
|
||||||
|
|
||||||
|
// Perform the CodeQL analysis
|
||||||
|
runResult, err := codeql.RunQuery("google_flatbuffers_db.zip", "cpp", queryPackPath, tempDir)
|
||||||
|
if err != nil {
|
||||||
|
return result, fmt.Errorf("failed to run analysis: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare directory, extract query pack
|
// Generate a ZIP archive containing SARIF and BQRS files
|
||||||
if err := os.MkdirAll(queryExtract, 0755); err != nil {
|
resultsArchive, err := codeql.GenerateResultsZipArchive(runResult)
|
||||||
slog.Error("Failed to create query pack directory %s: %v", queryExtract, err)
|
if err != nil {
|
||||||
return "", err
|
return result, fmt.Errorf("failed to generate results archive: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := utils.UntarGz(queryPack, queryExtract); err != nil {
|
// TODO: Upload the archive to storage
|
||||||
slog.Error("Failed to extract querypack %s: %v", queryPack, err)
|
slog.Info("Results archive size", slog.Int("size", len(resultsArchive)))
|
||||||
return "", err
|
slog.Info("Analysis job successful.")
|
||||||
|
|
||||||
|
result = common.AnalyzeResult{
|
||||||
|
RequestId: job.RequestId,
|
||||||
|
ResultCount: runResult.ResultCount,
|
||||||
|
ResultArchiveURL: "REPLACE_THIS_WITH_STORED_RESULTS_ARCHIVE", // TODO
|
||||||
|
Status: common.StatusSuccess,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare query result directory
|
return result, nil
|
||||||
if err := os.MkdirAll(queryOutDir, 0755); err != nil {
|
|
||||||
slog.Error("Failed to create query result directory %s: %v", queryOutDir, err)
|
|
||||||
return "", err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run database analyze
|
// RunWorker runs a worker that processes jobs from queue
|
||||||
cmd := exec.Command("codeql", "database", "analyze",
|
func RunWorker(queue queue.Queue, wg *sync.WaitGroup) {
|
||||||
"--format=sarif-latest", "--rerun", "--output", queryOutFile,
|
defer wg.Done()
|
||||||
"-j8", dbPath, queryExtract)
|
for job := range queue.Jobs() {
|
||||||
cmd.Dir = serverRoot
|
slog.Info("Running analysis job", slog.Any("job", job))
|
||||||
cmd.Stdout = os.Stdout
|
result, err := RunAnalysisJob(job)
|
||||||
cmd.Stderr = os.Stderr
|
if err != nil {
|
||||||
|
slog.Error("Failed to run analysis job", slog.Any("error", err))
|
||||||
if err := cmd.Run(); err != nil {
|
continue
|
||||||
slog.Error("codeql database analyze failed:", "error", err, "job", job)
|
}
|
||||||
storage.SetStatus(job.QueryPackId, job.NWO, common.StatusError)
|
slog.Info("Analysis job completed", slog.Any("result", result))
|
||||||
return "", err
|
queue.Results() <- result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return result path
|
|
||||||
return queryOutFile, nil
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import (
|
|||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Helper Functions
|
|
||||||
func contains(slice []string, item string) bool {
|
func contains(slice []string, item string) bool {
|
||||||
for _, s := range slice {
|
for _, s := range slice {
|
||||||
if s == item {
|
if s == item {
|
||||||
@@ -26,7 +25,6 @@ func contains(slice []string, item string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Main Functions
|
|
||||||
func getCodeQLCLIPath() (string, error) {
|
func getCodeQLCLIPath() (string, error) {
|
||||||
// get the CODEQL_CLI_PATH environment variable
|
// get the CODEQL_CLI_PATH environment variable
|
||||||
codeqlCliPath := os.Getenv("CODEQL_CLI_PATH")
|
codeqlCliPath := os.Getenv("CODEQL_CLI_PATH")
|
||||||
|
|||||||
163
pkg/queue/rabbitmq.go
Normal file
163
pkg/queue/rabbitmq.go
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"mrvacommander/pkg/common"
|
||||||
|
"mrvacommander/pkg/storage"
|
||||||
|
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
"golang.org/x/exp/slog"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RabbitMQQueue struct {
|
||||||
|
jobs chan common.AnalyzeJob
|
||||||
|
results chan common.AnalyzeResult
|
||||||
|
conn *amqp.Connection
|
||||||
|
channel *amqp.Channel
|
||||||
|
}
|
||||||
|
|
||||||
|
func InitializeRabbitMQQueue(
|
||||||
|
host string,
|
||||||
|
port int16,
|
||||||
|
user string,
|
||||||
|
password string,
|
||||||
|
) (*RabbitMQQueue, error) {
|
||||||
|
const (
|
||||||
|
tryCount = 5
|
||||||
|
retryDelaySec = 3
|
||||||
|
jobsQueueName = "tasks"
|
||||||
|
resultsQueueName = "results"
|
||||||
|
)
|
||||||
|
|
||||||
|
var conn *amqp.Connection
|
||||||
|
var err error
|
||||||
|
|
||||||
|
rabbitMQURL := fmt.Sprintf("amqp://%s:%s@%s:%d/", user, password, host, port)
|
||||||
|
|
||||||
|
for i := 0; i < tryCount; i++ {
|
||||||
|
slog.Info("Attempting to connect to RabbitMQ", slog.Int("attempt", i+1))
|
||||||
|
conn, err = amqp.Dial(rabbitMQURL)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("Failed to connect to RabbitMQ", "error", err)
|
||||||
|
if i < tryCount-1 {
|
||||||
|
slog.Info("Retrying", "seconds", retryDelaySec)
|
||||||
|
time.Sleep(retryDelaySec * time.Second)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// successfully connected to RabbitMQ
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Connected to RabbitMQ")
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, fmt.Errorf("failed to open a channel: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = ch.QueueDeclare(jobsQueueName, false, false, false, true, nil)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, fmt.Errorf("failed to declare tasks queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = ch.QueueDeclare(resultsQueueName, false, false, false, true, nil)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, fmt.Errorf("failed to declare results queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ch.Qos(1, 0, false)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return nil, fmt.Errorf("failed to set QoS: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
result := RabbitMQQueue{
|
||||||
|
conn: conn,
|
||||||
|
channel: ch,
|
||||||
|
jobs: make(chan common.AnalyzeJob),
|
||||||
|
results: make(chan common.AnalyzeResult),
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Starting tasks consumer")
|
||||||
|
go result.ConsumeJobs(jobsQueueName)
|
||||||
|
|
||||||
|
slog.Info("Starting results publisher")
|
||||||
|
go result.PublishResults(resultsQueueName)
|
||||||
|
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) Jobs() chan common.AnalyzeJob {
|
||||||
|
return q.jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) Results() chan common.AnalyzeResult {
|
||||||
|
return q.results
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) StartAnalyses(analysis_repos *map[common.NameWithOwner]storage.DBLocation, session_id int, session_language string) {
|
||||||
|
// TODO: Implement
|
||||||
|
log.Fatal("unimplemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) Close() {
|
||||||
|
q.channel.Close()
|
||||||
|
q.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) ConsumeJobs(queueName string) {
|
||||||
|
msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("failed to register a consumer", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
for msg := range msgs {
|
||||||
|
job := common.AnalyzeJob{}
|
||||||
|
err := json.Unmarshal(msg.Body, &job)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("failed to unmarshal job", slog.Any("error", err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
q.jobs <- job
|
||||||
|
}
|
||||||
|
close(q.jobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) PublishResults(queueName string) {
|
||||||
|
for result := range q.results {
|
||||||
|
q.publishResult(queueName, result)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RabbitMQQueue) publishResult(queueName string, result interface{}) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
resultBytes, err := json.Marshal(result)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("failed to marshal result", slog.Any("error", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
slog.Info("Publishing result", slog.String("result", string(resultBytes)))
|
||||||
|
err = q.channel.PublishWithContext(ctx, "", queueName, false, false,
|
||||||
|
amqp.Publishing{
|
||||||
|
ContentType: "application/json",
|
||||||
|
Body: resultBytes,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("failed to publish result", slog.Any("error", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
29
utils/download.go
Normal file
29
utils/download.go
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
func downloadFile(url string, dest string) error {
|
||||||
|
resp, err := http.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to download file: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
out, err := os.Create(dest)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create file: %w", err)
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
_, err = io.Copy(out, resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to copy file content: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user