section{Execution Loop in Pseudo-Code, declarative}

This commit is contained in:
Michael Hohn
2025-03-01 11:53:55 -08:00
committed by =Michael Hohn
parent a3593cbba2
commit 45e40abf5d

View File

@@ -395,6 +395,61 @@ while $Q \neq \emptyset$:
# Server sends results for $(\mathcal{Q}, \mathcal{R}_i)$ back to the client # Server sends results for $(\mathcal{Q}, \mathcal{R}_i)$ back to the client
$S$.send_results_to_client($C$, ($\mathcal{Q}$, $R_i$, $\mathcal{R}_i^{\mathcal{Q}}$)) $S$.send_results_to_client($C$, ($\mathcal{Q}$, $R_i$, $\mathcal{R}_i^{\mathcal{Q}}$))
\end{lstlisting} \end{lstlisting}
\end{listing}
\FloatBarrier
\section{Execution Loop in Pseudo-Code, declarative}
\begin{listing}[h] % h = here, t = top, b = bottom, p = page of floats
\caption{Distributed Query Execution Algorithm}
\begin{lstlisting}[language=Python]
# Distributed Query Execution with Agent Polling and Accumulated Results
# Define initial state
$\mathcal{R}$: set # Set of repositories
$\mathcal{Q}$: set # Set of queries
A: set # Set of agents
Q: list # Queue of $(\mathcal{Q}, \mathcal{R}_i)$ pairs
$\mathcal{R}_{\text{results}}$: dict = {} # Mapping of repositories to their accumulated query results
# Initialize result sets for each repository
$\mathcal{R}_{\text{results}}$ = {$\mathcal{R}_i$: set() for $\mathcal{R}_i$ in $\mathcal{R}$}
# Define job queue as an immutable mapping
Q = [($\mathcal{Q}$, $\mathcal{R}_i$) for $\mathcal{R}_i$ in $\mathcal{R}$]
# Processing as a declarative iteration over the job queue
def execute_queries(agents, job_queue, repository_results):
def available_agents():
return {$\alpha$ for $\alpha$ in agents if $\alpha$.is_available()}
def process_job($\mathcal{Q}$, $\mathcal{R}_i$, $\alpha$):
results = {$\mathcal{Q}_j$: $\alpha$.execute($\mathcal{Q}_j$, $\mathcal{R}_i$) for $\mathcal{Q}_j$ in $\mathcal{Q}$}
return $\mathcal{R}_i$, results
def accumulate_results($\mathcal{R}_{\text{results}}$, $\mathcal{R}_i$, query_results):
return {**$\mathcal{R}_{\text{results}}$, $\mathcal{R}_i$: $\mathcal{R}_{\text{results}}$[$\mathcal{R}_i$] | set().union(*query_results.values())}
while job_queue:
active_agents = available_agents()
for $\alpha$ in active_agents:
$\mathcal{Q}$, $\mathcal{R}_i$ = job_queue[0] # Peek at the first job
_, query_results = process_job($\mathcal{Q}$, $\mathcal{R}_i$, $\alpha$)
repository_results = accumulate_results(repository_results, $\mathcal{R}_i$, query_results)
$\alpha$.send_results(S, ($\mathcal{Q}$, $\mathcal{R}_i$, repository_results[$\mathcal{R}_i$]))
S.send_results_to_client(C, ($\mathcal{Q}$, $\mathcal{R}_i$, repository_results[$\mathcal{R}_i$]))
job_queue = job_queue[1:] # Move to the next job
return repository_results
# Execute the distributed query process
$\mathcal{R}_{\text{results}}$ = execute_queries(A, Q, $\mathcal{R}_{\text{results}}$)
\end{lstlisting}
\end{listing} \end{listing}
\FloatBarrier \FloatBarrier