From 45e40abf5d58c189dafc294a5d4e0d087c26d55b Mon Sep 17 00:00:00 2001 From: Michael Hohn Date: Sat, 1 Mar 2025 11:53:55 -0800 Subject: [PATCH] section{Execution Loop in Pseudo-Code, declarative} --- doc/mrva-overview.tex | 55 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/doc/mrva-overview.tex b/doc/mrva-overview.tex index ee73925..1e92b63 100644 --- a/doc/mrva-overview.tex +++ b/doc/mrva-overview.tex @@ -395,6 +395,61 @@ while $Q \neq \emptyset$: # Server sends results for $(\mathcal{Q}, \mathcal{R}_i)$ back to the client $S$.send_results_to_client($C$, ($\mathcal{Q}$, $R_i$, $\mathcal{R}_i^{\mathcal{Q}}$)) \end{lstlisting} +\end{listing} +\FloatBarrier + +\section{Execution Loop in Pseudo-Code, declarative} +\begin{listing}[h] % h = here, t = top, b = bottom, p = page of floats + \caption{Distributed Query Execution Algorithm} + +\begin{lstlisting}[language=Python] +# Distributed Query Execution with Agent Polling and Accumulated Results + +# Define initial state +$\mathcal{R}$: set # Set of repositories +$\mathcal{Q}$: set # Set of queries +A: set # Set of agents +Q: list # Queue of $(\mathcal{Q}, \mathcal{R}_i)$ pairs +$\mathcal{R}_{\text{results}}$: dict = {} # Mapping of repositories to their accumulated query results + +# Initialize result sets for each repository +$\mathcal{R}_{\text{results}}$ = {$\mathcal{R}_i$: set() for $\mathcal{R}_i$ in $\mathcal{R}$} + +# Define job queue as an immutable mapping +Q = [($\mathcal{Q}$, $\mathcal{R}_i$) for $\mathcal{R}_i$ in $\mathcal{R}$] + +# Processing as a declarative iteration over the job queue +def execute_queries(agents, job_queue, repository_results): + def available_agents(): + return {$\alpha$ for $\alpha$ in agents if $\alpha$.is_available()} + + def process_job($\mathcal{Q}$, $\mathcal{R}_i$, $\alpha$): + results = {$\mathcal{Q}_j$: $\alpha$.execute($\mathcal{Q}_j$, $\mathcal{R}_i$) for $\mathcal{Q}_j$ in $\mathcal{Q}$} + return $\mathcal{R}_i$, results + + def accumulate_results($\mathcal{R}_{\text{results}}$, $\mathcal{R}_i$, query_results): + return {**$\mathcal{R}_{\text{results}}$, $\mathcal{R}_i$: $\mathcal{R}_{\text{results}}$[$\mathcal{R}_i$] | set().union(*query_results.values())} + + while job_queue: + active_agents = available_agents() + for $\alpha$ in active_agents: + $\mathcal{Q}$, $\mathcal{R}_i$ = job_queue[0] # Peek at the first job + _, query_results = process_job($\mathcal{Q}$, $\mathcal{R}_i$, $\alpha$) + repository_results = accumulate_results(repository_results, $\mathcal{R}_i$, query_results) + + $\alpha$.send_results(S, ($\mathcal{Q}$, $\mathcal{R}_i$, repository_results[$\mathcal{R}_i$])) + S.send_results_to_client(C, ($\mathcal{Q}$, $\mathcal{R}_i$, repository_results[$\mathcal{R}_i$])) + + job_queue = job_queue[1:] # Move to the next job + + return repository_results + +# Execute the distributed query process +$\mathcal{R}_{\text{results}}$ = execute_queries(A, Q, $\mathcal{R}_{\text{results}}$) +\end{lstlisting} + + + \end{listing} \FloatBarrier