about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--ravanan/command-line-tool.scm14
-rw-r--r--ravanan/propnet.scm96
-rw-r--r--ravanan/workflow.scm47
3 files changed, 104 insertions, 53 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 14ab75c..c898430 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -85,11 +85,6 @@
 (define %worker-node
   (file-append node "/bin/node"))
 
-;; In batch systems that require it, poll job completion status every
-;; 5 seconds.
-(define %job-poll-interval
-  5)
-
 (define-immutable-record-type <scheduler-proc>
   (scheduler-proc name cwl scatter scatter-method)
   scheduler-proc?
@@ -1167,11 +1162,4 @@ failed."
                   (script->store-stderr-file script store))
           (capture-command-line-tool-output script store))))
 
-  (scheduler schedule
-             poll
-             (case batch-system
-               ;; Single machine jobs are run synchronously. So, there is no
-               ;; need to wait to poll them.
-               ((single-machine) 0)
-               ((slurm-api) %job-poll-interval))
-             capture-output))
+  (scheduler schedule poll capture-output))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index a8a549e..04ea31e 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -39,9 +39,10 @@
             scheduler
             scheduler-schedule
             scheduler-poll
-            scheduler-poll-interval
             scheduler-capture-output
-            run-propnet))
+            schedule-propnet
+            poll-propnet
+            capture-propnet-output))
 
 (define-immutable-record-type <propnet>
   (propnet propagators value=? merge-values scheduler)
@@ -61,13 +62,21 @@
   (outputs propagator-outputs))
 
 (define-immutable-record-type <scheduler>
-  (scheduler schedule poll poll-interval capture-output)
+  (scheduler schedule poll capture-output)
   scheduler?
   (schedule scheduler-schedule)
   (poll scheduler-poll)
-  (poll-interval scheduler-poll-interval)
   (capture-output scheduler-capture-output))
 
+(define-immutable-record-type <propnet-state>
+  (propnet-state propnet cells cells-inbox propagators-in-flight propagators-inbox)
+  propnet-state?
+  (propnet propnet-state-propnet)
+  (cells propnet-state-cells)
+  (cells-inbox propnet-state-cells-inbox)
+  (propagators-in-flight propnet-state-propagators-in-flight)
+  (propagators-inbox propnet-state-propagators-inbox))
+
 (define (activate-propagator schedule propagator inputs-alist)
   "Activate @var{propagator} with inputs from @var{inputs-alist}. If some
 required inputs are absent, do nothing. Schedule the propagator using
@@ -98,9 +107,25 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}."
                   name))
         (propnet-propagators propnet)))
 
-(define (run-propnet propnet initial-cell-values)
-  "Run @var{propnet} with @var{initial-cell-values} until cell values
-stabilize."
+(define (schedule-propnet propnet initial-cell-values)
+  "Start @var{propnet} with @var{initial-cell-values}, and return
+@code{<propnet-state>} object."
+  (propnet-state propnet
+                 (list)
+                 initial-cell-values
+                 (list)
+                 ;; Pre-schedule all propagators to ensure we
+                 ;; trigger those propagators that have no inputs
+                 ;; at all.
+                 (propnet-propagators propnet)))
+
+(define (poll-propnet state)
+  "Poll propagator network @var{state}. Return two values---a status symbol (either
+@code{completed} or @code{pending}) and the current state of the propagator
+network."
+  (define propnet
+    (propnet-state-propnet state))
+
   (define scheduler
     (propnet-scheduler propnet))
 
@@ -134,12 +159,10 @@ add to the inbox."
   ;; currently in flight. Each iteration of loop represents one state
   ;; transition. This is a very functional approach. Propagator network
   ;; implementations don't necessarily have to be mutational.
-  (let loop ((cells (list))
-             (cell-values-inbox initial-cell-values)
-             ;; Pre-schedule all propagators to ensure we trigger those
-             ;; propagators that have no inputs at all.
-             (propagators-inbox (propnet-propagators propnet))
-             (propagators-in-flight (list)))
+  (let loop ((cells (propnet-state-cells state))
+             (cell-values-inbox (propnet-state-cells-inbox state))
+             (propagators-inbox (propnet-state-propagators-inbox state))
+             (propagators-in-flight (propnet-state-propagators-in-flight state)))
     (match cell-values-inbox
       ;; Process one new cell value in inbox.
       (((cell-name . new-cell-value)
@@ -193,14 +216,18 @@ add to the inbox."
          ;; done.
          (()
           (match propagators-in-flight
-            ;; All propagators are finished. The propnet has
-            ;; stabilized. We are done. Return all cell values.
+            ;; All propagators are finished. The propnet has stabilized. We are
+            ;; done. Return all cell values.
             (()
-             cells)
+             (values 'completed
+                     (propnet-state propnet
+                                    cells
+                                    cell-values-inbox
+                                    propagators-in-flight
+                                    propagators-inbox)))
+            ;; Propagators are still in flight. Check if any of them have
+            ;; completed.
             (_
-             ;; Pause before polling so we don't bother the job server
-             ;; too often.
-             (sleep (scheduler-poll-interval scheduler))
              (let ((finished-propagators
                     propagators-still-in-flight
                     (partition (match-lambda
@@ -209,10 +236,27 @@ add to the inbox."
                                         state)
                                        'completed)))
                                propagators-in-flight)))
-               (loop cells
-                     (apply assoc-set
-                            cell-values-inbox
-                            (append-map propagator-state->cell-values
-                                        finished-propagators))
-                     propagators-inbox
-                     propagators-still-in-flight))))))))))
+               (match finished-propagators
+                 ;; None of the propagators we checked have completed. Return a
+                 ;; pending state.
+                 (()
+                  (values 'pending
+                          (propnet-state propnet
+                                         cells
+                                         cell-values-inbox
+                                         propagators-still-in-flight
+                                         propagators-inbox)))
+                 ;; Some of the propagators we checked have completed. Enqueue
+                 ;; their outputs in the cells inbox and loop.
+                 (_
+                  (loop cells
+                        (apply assoc-set
+                               cell-values-inbox
+                               (append-map propagator-state->cell-values
+                                           finished-propagators))
+                        propagators-inbox
+                        propagators-still-in-flight))))))))))))
+
+(define (capture-propnet-output state)
+  "Return output of propagator network @var{state}."
+  (propnet-state-cells state))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 86f6160..0249a7e 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -19,6 +19,7 @@
 (define-module (ravanan workflow)
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-26)
+  #:use-module (srfi srfi-71)
   #:use-module (ice-9 filesystem)
   #:use-module (ice-9 match)
   #:use-module (ravanan command-line-tool)
@@ -36,6 +37,11 @@
          "SubworkflowFeatureRequirement"
          %command-line-tool-supported-requirements))
 
+;; In batch systems that require it, poll job completion status every
+;; 5 seconds.
+(define %job-poll-interval
+  5)
+
 (define (value=? maybe-val1 maybe-val2)
   "Return @code{#t} if maybe-monadic values @var{maybe-val1} and
 @var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -294,17 +300,30 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
                          (user-error "Required input `~a' not specified"
                                      input-id))))
                    (assoc-ref cwl "inputs"))
-  (let ((cell-values
-         (run-propnet
-          (propnet (workflow->propagators name cwl)
-                   value=?
-                   merge-values
-                   (command-line-tool-scheduler
-                    manifest scratch store batch-system
-                    #:guix-daemon-socket guix-daemon-socket
-                    #:slurm-api-endpoint slurm-api-endpoint
-                    #:slurm-jwt slurm-jwt))
-          inputs)))
-    ;; Capture outputs.
-    (vector-filter-map->list (cut capture-output cell-values <>)
-                             (assoc-ref* cwl "outputs"))))
+  (let loop ((state (schedule-propnet
+                     (propnet (workflow->propagators name cwl)
+                              value=?
+                              merge-values
+                              (command-line-tool-scheduler
+                               manifest scratch store batch-system
+                               #:guix-daemon-socket guix-daemon-socket
+                               #:slurm-api-endpoint slurm-api-endpoint
+                               #:slurm-jwt slurm-jwt))
+                     inputs)))
+    ;; Poll.
+    (let ((status state (poll-propnet state)))
+      (if (eq? status 'pending)
+          (begin
+            ;; Pause before looping and polling again so we don't bother the job
+            ;; server too often.
+            (sleep (case batch-system
+                     ;; Single machine jobs are run synchronously. So, there is
+                     ;; no need to wait to poll them.
+                     ((single-machine) 0)
+                     ((slurm-api) %job-poll-interval)))
+            (loop state))
+          ;; Capture outputs.
+          (vector-filter-map->list (cute capture-output
+                                         (capture-propnet-output state)
+                                         <>)
+                                   (assoc-ref* cwl "outputs"))))))