From 4d52176c5a4565b4c13283cd3d5040b6b66f5af9 Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Thu, 14 Nov 2024 12:04:18 -0800 Subject: [PATCH] Add Publisher Confirms and Consumer Acknowledgements to rabbitmq channels Also updated the end-to-end workflow The confirmation channel size is intentionally very large to prevent blocking the server or agents. --- notes/cli-end-to-end-detailed.org | 100 +++++++++++++++++------------- pkg/queue/queue_rabbitmq.go | 70 +++++++++++++++++++-- 2 files changed, 121 insertions(+), 49 deletions(-) diff --git a/notes/cli-end-to-end-detailed.org b/notes/cli-end-to-end-detailed.org index eafcfec..662d61f 100644 --- a/notes/cli-end-to-end-detailed.org +++ b/notes/cli-end-to-end-detailed.org @@ -336,6 +336,7 @@ databases. #+BEGIN_SRC sh cd ~/work-gh/mrva/mrvacommander/client/qldbtools + . venv/bin/activate ./bin/mc-rows-from-mrva-list scratch/gh-mrva-selection.json \ scratch/db-info-3.csv > scratch/selection-full-info csvcut -c path scratch/selection-full-info @@ -346,7 +347,7 @@ cd ~/work-gh/mrva/gh-mrva/ code gh-mrva.code-workspace #+END_SRC - In this case, the trivial =findPrintf=: + In this case, the trivial =findPrintf= query, in the file =Fprintf.ql= #+BEGIN_SRC java /** ,* @name findPrintf @@ -374,20 +375,21 @@ ~/work-gh/mrva/gh-mrva/gh-mrva-selection.json cd ~/work-gh/mrva/gh-mrva/ - ./gh-mrva submit --language cpp --session mirva-session-1480 \ + ./gh-mrva submit --language cpp --session mirva-session-3650 \ --list mirva-list \ --query ~/work-gh/mrva/gh-mrva/Fprintf.ql #+END_SRC + 4. Check the status #+BEGIN_SRC sh cd ~/work-gh/mrva/gh-mrva/ - ./gh-mrva status --session mirva-session-1480 + ./gh-mrva status --session mirva-session-3650 #+END_SRC This time we have results #+BEGIN_SRC text - ... - Run name: mirva-session-1480 + ... + 0:$ Run name: mirva-session-3650 Status: succeeded Total runs: 1 Total successful scans: 11 @@ -397,68 +399,78 @@ Total skipped repositories due to not found: 0 Total skipped repositories due to no database: 0 Total skipped repositories due to over limit: 0 - Total repositories with findings: 7 - Total findings: 618 + Total repositories with findings: 8 + Total findings: 7055 Repositories with findings: - quickfix/quickfixctsjebfd13 (cpp-fprintf-call): 5 - libfuse/libfusectsj7a66a4 (cpp-fprintf-call): 146 - xoreaxeaxeax/movfuscatorctsj8f7e5b (cpp-fprintf-call): 80 - pocoproject/pococtsj26b932 (cpp-fprintf-call): 17 - BoomingTech/Piccoloctsj6d7177 (cpp-fprintf-call): 10 - tdlib/telegram-bot-apictsj8529d9 (cpp-fprintf-call): 247 - WinMerge/winmergectsj101305 (cpp-fprintf-call): 113 + lz4/lz4ctsj2479c5 (cpp-fprintf-call): 307 + Mbed-TLS/mbedtlsctsj17ef85 (cpp-fprintf-call): 6464 + tsl0922/ttydctsj2e3faa (cpp-fprintf-call): 11 + medooze/media-server-nodectsj5e30b3 (cpp-fprintf-call): 105 + ampl/gslctsj4b270e (cpp-fprintf-call): 102 + baidu/sofa-pbrpcctsjba3501 (cpp-fprintf-call): 24 + dlundquist/sniproxyctsj3d83e7 (cpp-fprintf-call): 34 + hyprwm/Hyprlandctsjc2425f (cpp-fprintf-call): 8 #+END_SRC + 5. Download the sarif files, optionally also get databases. + + #+BEGIN_SRC sh cd ~/work-gh/mrva/gh-mrva/ # Just download the sarif files - ./gh-mrva download --session mirva-session-1480 \ - --output-dir mirva-session-1480 + ./gh-mrva download --session mirva-session-3650 \ + --output-dir mirva-session-3650 # Download the sarif files and CodeQL dbs - ./gh-mrva download --session mirva-session-1480 \ + ./gh-mrva download --session mirva-session-3650 \ --download-dbs \ - --output-dir mirva-session-1480 - + --output-dir mirva-session-3650 + #+END_SRC + #+BEGIN_SRC sh # And list them: - \ls -la *1480* - -rwxr-xr-x@ 1 hohn staff 1915857 Aug 16 14:10 BoomingTech_Piccoloctsj6d7177_1.sarif - drwxr-xr-x@ 3 hohn staff 96 Aug 16 14:15 BoomingTech_Piccoloctsj6d7177_1_db - -rwxr-xr-x@ 1 hohn staff 89857056 Aug 16 14:11 BoomingTech_Piccoloctsj6d7177_1_db.zip - -rwxr-xr-x@ 1 hohn staff 3105663 Aug 16 14:10 WinMerge_winmergectsj101305_1.sarif - -rwxr-xr-x@ 1 hohn staff 227812131 Aug 16 14:12 WinMerge_winmergectsj101305_1_db.zip - -rwxr-xr-x@ 1 hohn staff 193976 Aug 16 14:10 libfuse_libfusectsj7a66a4_1.sarif - -rwxr-xr-x@ 1 hohn staff 12930693 Aug 16 14:10 libfuse_libfusectsj7a66a4_1_db.zip - -rwxr-xr-x@ 1 hohn staff 1240694 Aug 16 14:10 pocoproject_pococtsj26b932_1.sarif - -rwxr-xr-x@ 1 hohn staff 158924920 Aug 16 14:12 pocoproject_pococtsj26b932_1_db.zip - -rwxr-xr-x@ 1 hohn staff 888494 Aug 16 14:10 quickfix_quickfixctsjebfd13_1.sarif - -rwxr-xr-x@ 1 hohn staff 75023303 Aug 16 14:11 quickfix_quickfixctsjebfd13_1_db.zip - -rwxr-xr-x@ 1 hohn staff 1487363 Aug 16 14:10 tdlib_telegram-bot-apictsj8529d9_1.sarif - -rwxr-xr-x@ 1 hohn staff 373477635 Aug 16 14:14 tdlib_telegram-bot-apictsj8529d9_1_db.zip - -rwxr-xr-x@ 1 hohn staff 103657 Aug 16 14:10 xoreaxeaxeax_movfuscatorctsj8f7e5b_1.sarif - -rwxr-xr-x@ 1 hohn staff 9464225 Aug 16 14:10 xoreaxeaxeax_movfuscatorctsj8f7e5b_1_db.zip + \ls -la *3650* + drwxr-xr-x@ 18 hohn staff 576 Nov 14 11:54 . + drwxrwxr-x@ 56 hohn staff 1792 Nov 14 11:54 .. + -rwxr-xr-x@ 1 hohn staff 9035554 Nov 14 11:54 Mbed-TLS_mbedtlsctsj17ef85_1.sarif + -rwxr-xr-x@ 1 hohn staff 57714273 Nov 14 11:54 Mbed-TLS_mbedtlsctsj17ef85_1_db.zip + -rwxr-xr-x@ 1 hohn staff 132484 Nov 14 11:54 ampl_gslctsj4b270e_1.sarif + -rwxr-xr-x@ 1 hohn staff 99234414 Nov 14 11:54 ampl_gslctsj4b270e_1_db.zip + -rwxr-xr-x@ 1 hohn staff 34419 Nov 14 11:54 baidu_sofa-pbrpcctsjba3501_1.sarif + -rwxr-xr-x@ 1 hohn staff 55177796 Nov 14 11:54 baidu_sofa-pbrpcctsjba3501_1_db.zip + -rwxr-xr-x@ 1 hohn staff 80744 Nov 14 11:54 dlundquist_sniproxyctsj3d83e7_1.sarif + -rwxr-xr-x@ 1 hohn staff 2183836 Nov 14 11:54 dlundquist_sniproxyctsj3d83e7_1_db.zip + -rwxr-xr-x@ 1 hohn staff 169079 Nov 14 11:54 hyprwm_Hyprlandctsjc2425f_1.sarif + -rwxr-xr-x@ 1 hohn staff 21383303 Nov 14 11:54 hyprwm_Hyprlandctsjc2425f_1_db.zip + -rwxr-xr-x@ 1 hohn staff 489064 Nov 14 11:54 lz4_lz4ctsj2479c5_1.sarif + -rwxr-xr-x@ 1 hohn staff 2991310 Nov 14 11:54 lz4_lz4ctsj2479c5_1_db.zip + -rwxr-xr-x@ 1 hohn staff 141336 Nov 14 11:54 medooze_media-server-nodectsj5e30b3_1.sarif + -rwxr-xr-x@ 1 hohn staff 38217703 Nov 14 11:54 medooze_media-server-nodectsj5e30b3_1_db.zip + -rwxr-xr-x@ 1 hohn staff 33861 Nov 14 11:54 tsl0922_ttydctsj2e3faa_1.sarif + -rwxr-xr-x@ 1 hohn staff 5140183 Nov 14 11:54 tsl0922_ttydctsj2e3faa_1_db.zip #+END_SRC 6. Use the [[https://marketplace.visualstudio.com/items?itemName=MS-SarifVSCode.sarif-viewer][SARIF Viewer]] plugin in VS Code to open and review the results. Prepare the source directory so the viewer can be pointed at it #+BEGIN_SRC sh - cd ~/work-gh/mrva/gh-mrva/mirva-session-1480 + cd ~/work-gh/mrva/gh-mrva/mirva-session-3650 - unzip -qd BoomingTech_Piccoloctsj6d7177_1_db BoomingTech_Piccoloctsj6d7177_1_db.zip + unzip -qd ampl_gslctsj4b270e_1_db ampl_gslctsj4b270e_1_db.zip - cd BoomingTech_Piccoloctsj6d7177_1_db/codeql_db/ - unzip -qd src src.zip + cd ampl_gslctsj4b270e_1_db/codeql_db + unzip -qd src src.zip #+END_SRC - Use the viewer + Use the viewer in VS Code #+BEGIN_SRC sh - code BoomingTech_Piccoloctsj6d7177_1.sarif + cd ~/work-gh/mrva/gh-mrva/mirva-session-3650 + code ampl_gslctsj4b270e_1.sarif - # For lauxlib.c, point the source viewer to - find ~/work-gh/mrva/gh-mrva/mirva-session-1480/BoomingTech_Piccoloctsj6d7177_1_db/codeql_db/src/home/runner/work/bulk-builder/bulk-builder -name lauxlib.c + # For the file vegas.c, when asked, point the source viewer to + find ~/work-gh/mrva/gh-mrva/mirva-session-3650/ampl_gslctsj4b270e_1_db/codeql_db/src/\ + -name vegas.c - # Here: ~/work-gh/mrva/gh-mrva/mirva-session-1480/BoomingTech_Piccoloctsj6d7177_1_db/codeql_db/src/home/runner/work/bulk-builder/bulk-builder/engine/3rdparty/lua-5.4.4/lauxlib.c + # Here: ~/work-gh/mrva/gh-mrva/mirva-session-3650/ampl_gslctsj4b270e_1_db/codeql_db/src//home/runner/work/bulk-builder/bulk-builder/monte/vegas.c #+END_SRC 7. (optional) Large result sets are more easily filtered via diff --git a/pkg/queue/queue_rabbitmq.go b/pkg/queue/queue_rabbitmq.go index eba861e..6f57956 100644 --- a/pkg/queue/queue_rabbitmq.go +++ b/pkg/queue/queue_rabbitmq.go @@ -32,8 +32,8 @@ func NewRabbitMQQueue( isAgent bool, ) (*RabbitMQQueue, error) { const ( - tryCount = 5 - retryDelaySec = 3 + tryCount = 5 + retryDelaySec = 3 // XX: static typing? jobsQueueName = "tasks" resultsQueueName = "results" @@ -126,12 +126,15 @@ func (q *RabbitMQQueue) Close() { } func (q *RabbitMQQueue) ConsumeJobs(queueName string) { - msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil) + autoAck := false + msgs, err := q.channel.Consume(queueName, "", autoAck, false, false, false, nil) + if err != nil { - slog.Error("failed to register a consumer", slog.Any("error", err)) + slog.Error("failed to consume from queue", slog.Any("error", err)) } for msg := range msgs { + // Process message job := AnalyzeJob{} err := json.Unmarshal(msg.Body, &job) if err != nil { @@ -139,6 +142,15 @@ func (q *RabbitMQQueue) ConsumeJobs(queueName string) { continue } q.jobs <- job + + // Acknowledge the message after successful processing + err = msg.Ack(false) + if err != nil { + slog.Error("Failed to acknowledge job consumption message", + slog.Any("error", err)) + continue + } + } close(q.jobs) } @@ -159,6 +171,17 @@ func (q *RabbitMQQueue) publishResult(queueName string, result AnalyzeResult) { return } + // Enable publisher confirms on the channel + err = q.channel.Confirm(false) + if err != nil { + slog.Error("Failed to enable publisher confirms", slog.Any("error", err)) + } + + // Set up a confirmation channel. This uses a large capacity to avoid blocking. + confirmChannelSize := 99999 + confirmations := q.channel.NotifyPublish(make(chan amqp.Confirmation, confirmChannelSize)) + + // Publish the message slog.Debug("Publishing result", slog.String("result", string(resultBytes))) err = q.channel.PublishWithContext(ctx, "", queueName, false, false, amqp.Publishing{ @@ -168,6 +191,13 @@ func (q *RabbitMQQueue) publishResult(queueName string, result AnalyzeResult) { if err != nil { slog.Error("failed to publish result", slog.Any("error", err)) } + + // Wait for the confirmation + confirm := <-confirmations + if !confirm.Ack { + slog.Error("Publish result message confirmation failed") + } + } func (q *RabbitMQQueue) publishJob(queueName string, job AnalyzeJob) { @@ -180,6 +210,18 @@ func (q *RabbitMQQueue) publishJob(queueName string, job AnalyzeJob) { return } + // Enable publisher confirms on the channel + err = q.channel.Confirm(false) + if err != nil { + slog.Error("Failed to enable publisher confirms", slog.Any("error", err)) + } + + // Set up a confirmation channel. This uses a large capacity to avoid + // blocking server requests. + confirmChannelSize := 99999 + confirmations := q.channel.NotifyPublish(make(chan amqp.Confirmation, confirmChannelSize)) + + // Publish the job slog.Debug("Publishing job", slog.String("job", string(jobBytes))) err = q.channel.PublishWithContext(ctx, "", queueName, false, false, amqp.Publishing{ @@ -189,6 +231,13 @@ func (q *RabbitMQQueue) publishJob(queueName string, job AnalyzeJob) { if err != nil { slog.Error("failed to publish job", slog.Any("error", err)) } + + // Wait for the confirmation + confirm := <-confirmations + if !confirm.Ack { + slog.Error("Publish result message confirmation failed") + } + } func (q *RabbitMQQueue) PublishJobs(queueName string) { @@ -198,12 +247,14 @@ func (q *RabbitMQQueue) PublishJobs(queueName string) { } func (q *RabbitMQQueue) ConsumeResults(queueName string) { - msgs, err := q.channel.Consume(queueName, "", true, false, false, false, nil) + autoAck := false + msgs, err := q.channel.Consume(queueName, "", autoAck, false, false, false, nil) if err != nil { slog.Error("failed to register a consumer", slog.Any("error", err)) } for msg := range msgs { + // Process message result := AnalyzeResult{} err := json.Unmarshal(msg.Body, &result) if err != nil { @@ -211,6 +262,15 @@ func (q *RabbitMQQueue) ConsumeResults(queueName string) { continue } q.results <- result + + // Acknowledge the message after successful processing + err = msg.Ack(false) + if err != nil { + slog.Error("Failed to acknowledge result consumption message", + slog.Any("error", err)) + continue + } + } close(q.results) }