From 39b46001f4984f3ab8cf30a974cd8744ef9dfc73 Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Mon, 23 Sep 2024 23:43:33 +0100 Subject: propnet: Present a polling interface to callers. (ravanan propnet) now presents a polling interface to the outside world. Instead of computing the entire propagator network in one function call, it needs to be polled repeatedly until it is done. Hence, the polling logic (such as the polling interval) needs to be outside (ravanan propnet). * ravanan/propnet.scm ()[poll-interval]: Delete field. (): New record type. (schedule-propnet, capture-propnet-output): New public functions. (run-propnet): Rename to poll-propnet; return status and state values instead of fulling computing the propnet. * ravanan/command-line-tool.scm (%job-poll-interval): Move to (ravanan workflow). (command-line-tool-scheduler): Do not initialize poll-interval in scheduler. * ravanan/workflow.scm: Import (srfi srfi-71). (run-workflow): Use the new polling interface to the propnet. --- ravanan/command-line-tool.scm | 14 +------ ravanan/propnet.scm | 96 +++++++++++++++++++++++++++++++------------ ravanan/workflow.scm | 47 ++++++++++++++------- 3 files changed, 104 insertions(+), 53 deletions(-) diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm index 14ab75c..c898430 100644 --- a/ravanan/command-line-tool.scm +++ b/ravanan/command-line-tool.scm @@ -85,11 +85,6 @@ (define %worker-node (file-append node "/bin/node")) -;; In batch systems that require it, poll job completion status every -;; 5 seconds. -(define %job-poll-interval - 5) - (define-immutable-record-type (scheduler-proc name cwl scatter scatter-method) scheduler-proc? @@ -1167,11 +1162,4 @@ failed." (script->store-stderr-file script store)) (capture-command-line-tool-output script store)))) - (scheduler schedule - poll - (case batch-system - ;; Single machine jobs are run synchronously. So, there is no - ;; need to wait to poll them. - ((single-machine) 0) - ((slurm-api) %job-poll-interval)) - capture-output)) + (scheduler schedule poll capture-output)) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index a8a549e..04ea31e 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -39,9 +39,10 @@ scheduler scheduler-schedule scheduler-poll - scheduler-poll-interval scheduler-capture-output - run-propnet)) + schedule-propnet + poll-propnet + capture-propnet-output)) (define-immutable-record-type (propnet propagators value=? merge-values scheduler) @@ -61,13 +62,21 @@ (outputs propagator-outputs)) (define-immutable-record-type - (scheduler schedule poll poll-interval capture-output) + (scheduler schedule poll capture-output) scheduler? (schedule scheduler-schedule) (poll scheduler-poll) - (poll-interval scheduler-poll-interval) (capture-output scheduler-capture-output)) +(define-immutable-record-type + (propnet-state propnet cells cells-inbox propagators-in-flight propagators-inbox) + propnet-state? + (propnet propnet-state-propnet) + (cells propnet-state-cells) + (cells-inbox propnet-state-cells-inbox) + (propagators-in-flight propnet-state-propagators-in-flight) + (propagators-inbox propnet-state-propagators-inbox)) + (define (activate-propagator schedule propagator inputs-alist) "Activate @var{propagator} with inputs from @var{inputs-alist}. If some required inputs are absent, do nothing. Schedule the propagator using @@ -98,9 +107,25 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}." name)) (propnet-propagators propnet))) -(define (run-propnet propnet initial-cell-values) - "Run @var{propnet} with @var{initial-cell-values} until cell values -stabilize." +(define (schedule-propnet propnet initial-cell-values) + "Start @var{propnet} with @var{initial-cell-values}, and return +@code{} object." + (propnet-state propnet + (list) + initial-cell-values + (list) + ;; Pre-schedule all propagators to ensure we + ;; trigger those propagators that have no inputs + ;; at all. + (propnet-propagators propnet))) + +(define (poll-propnet state) + "Poll propagator network @var{state}. Return two values---a status symbol (either +@code{completed} or @code{pending}) and the current state of the propagator +network." + (define propnet + (propnet-state-propnet state)) + (define scheduler (propnet-scheduler propnet)) @@ -134,12 +159,10 @@ add to the inbox." ;; currently in flight. Each iteration of loop represents one state ;; transition. This is a very functional approach. Propagator network ;; implementations don't necessarily have to be mutational. - (let loop ((cells (list)) - (cell-values-inbox initial-cell-values) - ;; Pre-schedule all propagators to ensure we trigger those - ;; propagators that have no inputs at all. - (propagators-inbox (propnet-propagators propnet)) - (propagators-in-flight (list))) + (let loop ((cells (propnet-state-cells state)) + (cell-values-inbox (propnet-state-cells-inbox state)) + (propagators-inbox (propnet-state-propagators-inbox state)) + (propagators-in-flight (propnet-state-propagators-in-flight state))) (match cell-values-inbox ;; Process one new cell value in inbox. (((cell-name . new-cell-value) @@ -193,14 +216,18 @@ add to the inbox." ;; done. (() (match propagators-in-flight - ;; All propagators are finished. The propnet has - ;; stabilized. We are done. Return all cell values. + ;; All propagators are finished. The propnet has stabilized. We are + ;; done. Return all cell values. (() - cells) + (values 'completed + (propnet-state propnet + cells + cell-values-inbox + propagators-in-flight + propagators-inbox))) + ;; Propagators are still in flight. Check if any of them have + ;; completed. (_ - ;; Pause before polling so we don't bother the job server - ;; too often. - (sleep (scheduler-poll-interval scheduler)) (let ((finished-propagators propagators-still-in-flight (partition (match-lambda @@ -209,10 +236,27 @@ add to the inbox." state) 'completed))) propagators-in-flight))) - (loop cells - (apply assoc-set - cell-values-inbox - (append-map propagator-state->cell-values - finished-propagators)) - propagators-inbox - propagators-still-in-flight)))))))))) + (match finished-propagators + ;; None of the propagators we checked have completed. Return a + ;; pending state. + (() + (values 'pending + (propnet-state propnet + cells + cell-values-inbox + propagators-still-in-flight + propagators-inbox))) + ;; Some of the propagators we checked have completed. Enqueue + ;; their outputs in the cells inbox and loop. + (_ + (loop cells + (apply assoc-set + cell-values-inbox + (append-map propagator-state->cell-values + finished-propagators)) + propagators-inbox + propagators-still-in-flight)))))))))))) + +(define (capture-propnet-output state) + "Return output of propagator network @var{state}." + (propnet-state-cells state)) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 86f6160..0249a7e 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -19,6 +19,7 @@ (define-module (ravanan workflow) #:use-module (srfi srfi-1) #:use-module (srfi srfi-26) + #:use-module (srfi srfi-71) #:use-module (ice-9 filesystem) #:use-module (ice-9 match) #:use-module (ravanan command-line-tool) @@ -36,6 +37,11 @@ "SubworkflowFeatureRequirement" %command-line-tool-supported-requirements)) +;; In batch systems that require it, poll job completion status every +;; 5 seconds. +(define %job-poll-interval + 5) + (define (value=? maybe-val1 maybe-val2) "Return @code{#t} if maybe-monadic values @var{maybe-val1} and @var{maybe-val2} are the same value. Else, return @code{#f}." @@ -294,17 +300,30 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and (user-error "Required input `~a' not specified" input-id)))) (assoc-ref cwl "inputs")) - (let ((cell-values - (run-propnet - (propnet (workflow->propagators name cwl) - value=? - merge-values - (command-line-tool-scheduler - manifest scratch store batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)) - inputs))) - ;; Capture outputs. - (vector-filter-map->list (cut capture-output cell-values <>) - (assoc-ref* cwl "outputs")))) + (let loop ((state (schedule-propnet + (propnet (workflow->propagators name cwl) + value=? + merge-values + (command-line-tool-scheduler + manifest scratch store batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt)) + inputs))) + ;; Poll. + (let ((status state (poll-propnet state))) + (if (eq? status 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the job + ;; server too often. + (sleep (case batch-system + ;; Single machine jobs are run synchronously. So, there is + ;; no need to wait to poll them. + ((single-machine) 0) + ((slurm-api) %job-poll-interval))) + (loop state)) + ;; Capture outputs. + (vector-filter-map->list (cute capture-output + (capture-propnet-output state) + <>) + (assoc-ref* cwl "outputs")))))) -- cgit v1.2.3