diff options
-rw-r--r-- | ravanan/command-line-tool.scm | 14 | ||||
-rw-r--r-- | ravanan/propnet.scm | 96 | ||||
-rw-r--r-- | ravanan/workflow.scm | 47 |
3 files changed, 104 insertions, 53 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm index 14ab75c..c898430 100644 --- a/ravanan/command-line-tool.scm +++ b/ravanan/command-line-tool.scm @@ -85,11 +85,6 @@ (define %worker-node (file-append node "/bin/node")) -;; In batch systems that require it, poll job completion status every -;; 5 seconds. -(define %job-poll-interval - 5) - (define-immutable-record-type <scheduler-proc> (scheduler-proc name cwl scatter scatter-method) scheduler-proc? @@ -1167,11 +1162,4 @@ failed." (script->store-stderr-file script store)) (capture-command-line-tool-output script store)))) - (scheduler schedule - poll - (case batch-system - ;; Single machine jobs are run synchronously. So, there is no - ;; need to wait to poll them. - ((single-machine) 0) - ((slurm-api) %job-poll-interval)) - capture-output)) + (scheduler schedule poll capture-output)) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index a8a549e..04ea31e 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -39,9 +39,10 @@ scheduler scheduler-schedule scheduler-poll - scheduler-poll-interval scheduler-capture-output - run-propnet)) + schedule-propnet + poll-propnet + capture-propnet-output)) (define-immutable-record-type <propnet> (propnet propagators value=? merge-values scheduler) @@ -61,13 +62,21 @@ (outputs propagator-outputs)) (define-immutable-record-type <scheduler> - (scheduler schedule poll poll-interval capture-output) + (scheduler schedule poll capture-output) scheduler? (schedule scheduler-schedule) (poll scheduler-poll) - (poll-interval scheduler-poll-interval) (capture-output scheduler-capture-output)) +(define-immutable-record-type <propnet-state> + (propnet-state propnet cells cells-inbox propagators-in-flight propagators-inbox) + propnet-state? + (propnet propnet-state-propnet) + (cells propnet-state-cells) + (cells-inbox propnet-state-cells-inbox) + (propagators-in-flight propnet-state-propagators-in-flight) + (propagators-inbox propnet-state-propagators-inbox)) + (define (activate-propagator schedule propagator inputs-alist) "Activate @var{propagator} with inputs from @var{inputs-alist}. If some required inputs are absent, do nothing. Schedule the propagator using @@ -98,9 +107,25 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}." name)) (propnet-propagators propnet))) -(define (run-propnet propnet initial-cell-values) - "Run @var{propnet} with @var{initial-cell-values} until cell values -stabilize." +(define (schedule-propnet propnet initial-cell-values) + "Start @var{propnet} with @var{initial-cell-values}, and return +@code{<propnet-state>} object." + (propnet-state propnet + (list) + initial-cell-values + (list) + ;; Pre-schedule all propagators to ensure we + ;; trigger those propagators that have no inputs + ;; at all. + (propnet-propagators propnet))) + +(define (poll-propnet state) + "Poll propagator network @var{state}. Return two values---a status symbol (either +@code{completed} or @code{pending}) and the current state of the propagator +network." + (define propnet + (propnet-state-propnet state)) + (define scheduler (propnet-scheduler propnet)) @@ -134,12 +159,10 @@ add to the inbox." ;; currently in flight. Each iteration of loop represents one state ;; transition. This is a very functional approach. Propagator network ;; implementations don't necessarily have to be mutational. - (let loop ((cells (list)) - (cell-values-inbox initial-cell-values) - ;; Pre-schedule all propagators to ensure we trigger those - ;; propagators that have no inputs at all. - (propagators-inbox (propnet-propagators propnet)) - (propagators-in-flight (list))) + (let loop ((cells (propnet-state-cells state)) + (cell-values-inbox (propnet-state-cells-inbox state)) + (propagators-inbox (propnet-state-propagators-inbox state)) + (propagators-in-flight (propnet-state-propagators-in-flight state))) (match cell-values-inbox ;; Process one new cell value in inbox. (((cell-name . new-cell-value) @@ -193,14 +216,18 @@ add to the inbox." ;; done. (() (match propagators-in-flight - ;; All propagators are finished. The propnet has - ;; stabilized. We are done. Return all cell values. + ;; All propagators are finished. The propnet has stabilized. We are + ;; done. Return all cell values. (() - cells) + (values 'completed + (propnet-state propnet + cells + cell-values-inbox + propagators-in-flight + propagators-inbox))) + ;; Propagators are still in flight. Check if any of them have + ;; completed. (_ - ;; Pause before polling so we don't bother the job server - ;; too often. - (sleep (scheduler-poll-interval scheduler)) (let ((finished-propagators propagators-still-in-flight (partition (match-lambda @@ -209,10 +236,27 @@ add to the inbox." state) 'completed))) propagators-in-flight))) - (loop cells - (apply assoc-set - cell-values-inbox - (append-map propagator-state->cell-values - finished-propagators)) - propagators-inbox - propagators-still-in-flight)))))))))) + (match finished-propagators + ;; None of the propagators we checked have completed. Return a + ;; pending state. + (() + (values 'pending + (propnet-state propnet + cells + cell-values-inbox + propagators-still-in-flight + propagators-inbox))) + ;; Some of the propagators we checked have completed. Enqueue + ;; their outputs in the cells inbox and loop. + (_ + (loop cells + (apply assoc-set + cell-values-inbox + (append-map propagator-state->cell-values + finished-propagators)) + propagators-inbox + propagators-still-in-flight)))))))))))) + +(define (capture-propnet-output state) + "Return output of propagator network @var{state}." + (propnet-state-cells state)) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 86f6160..0249a7e 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -19,6 +19,7 @@ (define-module (ravanan workflow) #:use-module (srfi srfi-1) #:use-module (srfi srfi-26) + #:use-module (srfi srfi-71) #:use-module (ice-9 filesystem) #:use-module (ice-9 match) #:use-module (ravanan command-line-tool) @@ -36,6 +37,11 @@ "SubworkflowFeatureRequirement" %command-line-tool-supported-requirements)) +;; In batch systems that require it, poll job completion status every +;; 5 seconds. +(define %job-poll-interval + 5) + (define (value=? maybe-val1 maybe-val2) "Return @code{#t} if maybe-monadic values @var{maybe-val1} and @var{maybe-val2} are the same value. Else, return @code{#f}." @@ -294,17 +300,30 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and (user-error "Required input `~a' not specified" input-id)))) (assoc-ref cwl "inputs")) - (let ((cell-values - (run-propnet - (propnet (workflow->propagators name cwl) - value=? - merge-values - (command-line-tool-scheduler - manifest scratch store batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)) - inputs))) - ;; Capture outputs. - (vector-filter-map->list (cut capture-output cell-values <>) - (assoc-ref* cwl "outputs")))) + (let loop ((state (schedule-propnet + (propnet (workflow->propagators name cwl) + value=? + merge-values + (command-line-tool-scheduler + manifest scratch store batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt)) + inputs))) + ;; Poll. + (let ((status state (poll-propnet state))) + (if (eq? status 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the job + ;; server too often. + (sleep (case batch-system + ;; Single machine jobs are run synchronously. So, there is + ;; no need to wait to poll them. + ((single-machine) 0) + ((slurm-api) %job-poll-interval))) + (loop state)) + ;; Capture outputs. + (vector-filter-map->list (cute capture-output + (capture-propnet-output state) + <>) + (assoc-ref* cwl "outputs")))))) |