aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ravanan/command-line-tool.scm14
-rw-r--r--ravanan/propnet.scm96
-rw-r--r--ravanan/workflow.scm47
3 files changed, 104 insertions, 53 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 14ab75c..c898430 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -85,11 +85,6 @@
(define %worker-node
(file-append node "/bin/node"))
-;; In batch systems that require it, poll job completion status every
-;; 5 seconds.
-(define %job-poll-interval
- 5)
-
(define-immutable-record-type <scheduler-proc>
(scheduler-proc name cwl scatter scatter-method)
scheduler-proc?
@@ -1167,11 +1162,4 @@ failed."
(script->store-stderr-file script store))
(capture-command-line-tool-output script store))))
- (scheduler schedule
- poll
- (case batch-system
- ;; Single machine jobs are run synchronously. So, there is no
- ;; need to wait to poll them.
- ((single-machine) 0)
- ((slurm-api) %job-poll-interval))
- capture-output))
+ (scheduler schedule poll capture-output))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index a8a549e..04ea31e 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -39,9 +39,10 @@
scheduler
scheduler-schedule
scheduler-poll
- scheduler-poll-interval
scheduler-capture-output
- run-propnet))
+ schedule-propnet
+ poll-propnet
+ capture-propnet-output))
(define-immutable-record-type <propnet>
(propnet propagators value=? merge-values scheduler)
@@ -61,13 +62,21 @@
(outputs propagator-outputs))
(define-immutable-record-type <scheduler>
- (scheduler schedule poll poll-interval capture-output)
+ (scheduler schedule poll capture-output)
scheduler?
(schedule scheduler-schedule)
(poll scheduler-poll)
- (poll-interval scheduler-poll-interval)
(capture-output scheduler-capture-output))
+(define-immutable-record-type <propnet-state>
+ (propnet-state propnet cells cells-inbox propagators-in-flight propagators-inbox)
+ propnet-state?
+ (propnet propnet-state-propnet)
+ (cells propnet-state-cells)
+ (cells-inbox propnet-state-cells-inbox)
+ (propagators-in-flight propnet-state-propagators-in-flight)
+ (propagators-inbox propnet-state-propagators-inbox))
+
(define (activate-propagator schedule propagator inputs-alist)
"Activate @var{propagator} with inputs from @var{inputs-alist}. If some
required inputs are absent, do nothing. Schedule the propagator using
@@ -98,9 +107,25 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}."
name))
(propnet-propagators propnet)))
-(define (run-propnet propnet initial-cell-values)
- "Run @var{propnet} with @var{initial-cell-values} until cell values
-stabilize."
+(define (schedule-propnet propnet initial-cell-values)
+ "Start @var{propnet} with @var{initial-cell-values}, and return
+@code{<propnet-state>} object."
+ (propnet-state propnet
+ (list)
+ initial-cell-values
+ (list)
+ ;; Pre-schedule all propagators to ensure we
+ ;; trigger those propagators that have no inputs
+ ;; at all.
+ (propnet-propagators propnet)))
+
+(define (poll-propnet state)
+ "Poll propagator network @var{state}. Return two values---a status symbol (either
+@code{completed} or @code{pending}) and the current state of the propagator
+network."
+ (define propnet
+ (propnet-state-propnet state))
+
(define scheduler
(propnet-scheduler propnet))
@@ -134,12 +159,10 @@ add to the inbox."
;; currently in flight. Each iteration of loop represents one state
;; transition. This is a very functional approach. Propagator network
;; implementations don't necessarily have to be mutational.
- (let loop ((cells (list))
- (cell-values-inbox initial-cell-values)
- ;; Pre-schedule all propagators to ensure we trigger those
- ;; propagators that have no inputs at all.
- (propagators-inbox (propnet-propagators propnet))
- (propagators-in-flight (list)))
+ (let loop ((cells (propnet-state-cells state))
+ (cell-values-inbox (propnet-state-cells-inbox state))
+ (propagators-inbox (propnet-state-propagators-inbox state))
+ (propagators-in-flight (propnet-state-propagators-in-flight state)))
(match cell-values-inbox
;; Process one new cell value in inbox.
(((cell-name . new-cell-value)
@@ -193,14 +216,18 @@ add to the inbox."
;; done.
(()
(match propagators-in-flight
- ;; All propagators are finished. The propnet has
- ;; stabilized. We are done. Return all cell values.
+ ;; All propagators are finished. The propnet has stabilized. We are
+ ;; done. Return all cell values.
(()
- cells)
+ (values 'completed
+ (propnet-state propnet
+ cells
+ cell-values-inbox
+ propagators-in-flight
+ propagators-inbox)))
+ ;; Propagators are still in flight. Check if any of them have
+ ;; completed.
(_
- ;; Pause before polling so we don't bother the job server
- ;; too often.
- (sleep (scheduler-poll-interval scheduler))
(let ((finished-propagators
propagators-still-in-flight
(partition (match-lambda
@@ -209,10 +236,27 @@ add to the inbox."
state)
'completed)))
propagators-in-flight)))
- (loop cells
- (apply assoc-set
- cell-values-inbox
- (append-map propagator-state->cell-values
- finished-propagators))
- propagators-inbox
- propagators-still-in-flight))))))))))
+ (match finished-propagators
+ ;; None of the propagators we checked have completed. Return a
+ ;; pending state.
+ (()
+ (values 'pending
+ (propnet-state propnet
+ cells
+ cell-values-inbox
+ propagators-still-in-flight
+ propagators-inbox)))
+ ;; Some of the propagators we checked have completed. Enqueue
+ ;; their outputs in the cells inbox and loop.
+ (_
+ (loop cells
+ (apply assoc-set
+ cell-values-inbox
+ (append-map propagator-state->cell-values
+ finished-propagators))
+ propagators-inbox
+ propagators-still-in-flight))))))))))))
+
+(define (capture-propnet-output state)
+ "Return output of propagator network @var{state}."
+ (propnet-state-cells state))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 86f6160..0249a7e 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -19,6 +19,7 @@
(define-module (ravanan workflow)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26)
+ #:use-module (srfi srfi-71)
#:use-module (ice-9 filesystem)
#:use-module (ice-9 match)
#:use-module (ravanan command-line-tool)
@@ -36,6 +37,11 @@
"SubworkflowFeatureRequirement"
%command-line-tool-supported-requirements))
+;; In batch systems that require it, poll job completion status every
+;; 5 seconds.
+(define %job-poll-interval
+ 5)
+
(define (value=? maybe-val1 maybe-val2)
"Return @code{#t} if maybe-monadic values @var{maybe-val1} and
@var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -294,17 +300,30 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
(user-error "Required input `~a' not specified"
input-id))))
(assoc-ref cwl "inputs"))
- (let ((cell-values
- (run-propnet
- (propnet (workflow->propagators name cwl)
- value=?
- merge-values
- (command-line-tool-scheduler
- manifest scratch store batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt))
- inputs)))
- ;; Capture outputs.
- (vector-filter-map->list (cut capture-output cell-values <>)
- (assoc-ref* cwl "outputs"))))
+ (let loop ((state (schedule-propnet
+ (propnet (workflow->propagators name cwl)
+ value=?
+ merge-values
+ (command-line-tool-scheduler
+ manifest scratch store batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt))
+ inputs)))
+ ;; Poll.
+ (let ((status state (poll-propnet state)))
+ (if (eq? status 'pending)
+ (begin
+ ;; Pause before looping and polling again so we don't bother the job
+ ;; server too often.
+ (sleep (case batch-system
+ ;; Single machine jobs are run synchronously. So, there is
+ ;; no need to wait to poll them.
+ ((single-machine) 0)
+ ((slurm-api) %job-poll-interval)))
+ (loop state))
+ ;; Capture outputs.
+ (vector-filter-map->list (cute capture-output
+ (capture-propnet-output state)
+ <>)
+ (assoc-ref* cwl "outputs"))))))