summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--ravanan/job-state.scm33
-rw-r--r--ravanan/propnet.scm298
-rw-r--r--ravanan/workflow.scm160
3 files changed, 268 insertions, 223 deletions
diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm
index aa709f0..a769698 100644
--- a/ravanan/job-state.scm
+++ b/ravanan/job-state.scm
@@ -57,28 +57,29 @@
    state))
 
 (define* (job-state-status state batch-system)
-  "Return current status and updated state of job with @var{state} on
-@var{batch-system}. The status is one of the symbols @code{completed},
-@code{failed} or @code{pending}."
+  "Return current status of job with @var{state} on @var{batch-system}. The status
+is one of the symbols @code{completed}, @code{failed} or @code{pending}
+encapsulated in the state monad."
   (cond
    ;; Single machine jobs are run synchronously. So, they return success or
    ;; failure immediately.
    ((single-machine-job-state? state)
-    (if (single-machine-job-state-success? state)
-        'completed
-        'failed))
+    (state-return
+     (if (single-machine-job-state-success? state)
+         'completed
+         'failed)))
    ;; Poll slurm for job state.
    ((slurm-job-state? state)
-    (run-with-state
-     (job-state (slurm-job-state-job-id state)
-                #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
-                #:jwt (slurm-api-batch-system-jwt batch-system))))
+    (job-state (slurm-job-state-job-id state)
+               #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+               #:jwt (slurm-api-batch-system-jwt batch-system)))
    ;; For list states, poll each state element and return 'completed only if all
    ;; state elements have completed.
    ((list? state)
-    (or (every (lambda (state-element)
-                 (case (job-state-status state-element batch-system)
-                   ((completed) => identity)
-                   (else #f)))
-               state)
-        'pending))))
+    (state-return
+     (or (every (lambda (state-element)
+                  (case (job-state-status state-element batch-system)
+                    ((completed) => identity)
+                    (else #f)))
+                state)
+         'pending)))))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index 360aea4..34624b5 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -20,6 +20,7 @@
   #:use-module ((rnrs base) #:select (assertion-violation))
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-9 gnu)
+  #:use-module (srfi srfi-26)
   #:use-module (srfi srfi-71)
   #:use-module (ice-9 match)
   #:use-module (ravanan work monads)
@@ -124,21 +125,22 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}."
         (propnet-propagators propnet)))
 
 (define (schedule-propnet propnet initial-cell-values)
-  "Start @var{propnet} with @var{initial-cell-values}, and return
+  "Start @var{propnet} with @var{initial-cell-values}, and return a state-monadic
 @code{<propnet-state>} object."
-  (propnet-state propnet
-                 (list)
-                 initial-cell-values
-                 (list)
-                 ;; Pre-schedule all propagators to ensure we
-                 ;; trigger those propagators that have no inputs
-                 ;; at all.
-                 (propnet-propagators propnet)))
+  (state-return
+   (propnet-state propnet
+                  (state-return (list))
+                  (state-return initial-cell-values)
+                  (state-return (list))
+                  ;; Pre-schedule all propagators to ensure we trigger those
+                  ;; propagators that have no inputs at all.
+                  (state-return (propnet-propagators propnet)))))
 
 (define (poll-propnet state)
-  "Poll propagator network @var{state}. Return a @code{<state+status>} object
-containing two values---the updated state of the propagator network and a status
-symbol (either @code{completed} or @code{pending})."
+  "Poll propagator network @var{state}. Return a state-monadic
+@code{<state+status>} object containing two values---the current state of the
+propagator network and a status symbol (either @code{completed} or
+@code{pending})."
   (define propnet
     (propnet-state-propnet state))
 
@@ -149,17 +151,19 @@ symbol (either @code{completed} or @code{pending})."
     (match-lambda
       ((propagator-name . state)
        "Convert the output of a completed propagator into new cell values to
-add to the inbox."
+add to the inbox. The return value is a state-monadic list."
        (let ((output-mapping
               (propagator-outputs
                (find-propagator-name propnet propagator-name))))
-         (map (match-lambda
-                ((output-name . value)
-                 (cons (or (assoc-ref output-mapping output-name)
-                           (assertion-violation output-name "Unknown output"))
-                       value)))
-              ((scheduler-capture-output scheduler)
-               state))))))
+         (state-let* ((captured-outputs ((scheduler-capture-output scheduler)
+                                         state)))
+           (state-return
+            (map (match-lambda
+                   ((output-name . value)
+                    (cons (or (assoc-ref output-mapping output-name)
+                              (assertion-violation output-name "Unknown output"))
+                          value)))
+                 captured-outputs)))))))
 
   (define (propagator-input-values cells propagator)
     "Return input values for @var{propagator} extracted from @var{cells}."
@@ -173,129 +177,155 @@ add to the inbox."
   (define (schedule-propagators propagators cells)
     "Schedule all propagators among @var{propagators} whose inputs are present in
 @var{cells}. Return an association list mapping scheduled propagator names to
-their states."
-    (append-map (lambda (propagator)
-                  (maybe-alist
-                   (cons (propagator-name propagator)
-                         (maybe-let* ((propagator-state
-                                       (activate-propagator
-                                        scheduler
-                                        propagator
-                                        (propagator-input-values cells propagator))))
-                           (just (run-with-state propagator-state))))))
-                propagators))
+their monadic states."
+    (state-map (match-lambda
+                 ((name . mpropagator-state)
+                  (state-let* ((propagator-state mpropagator-state))
+                    (state-return (cons name propagator-state)))))
+               (append-map (lambda (propagator)
+                             (maybe-alist
+                              (cons (propagator-name propagator)
+                                    (activate-propagator
+                                     scheduler
+                                     propagator
+                                     (propagator-input-values cells propagator)))))
+                           propagators)))
 
   ;; We implement propagator networks as a state machine. The state consists of
   ;; the current values of all the cells and the list of all propagators
   ;; currently in flight. Each iteration of loop represents one state
   ;; transition. This is a very functional approach. Propagator network
   ;; implementations don't necessarily have to be mutational.
-  (let loop ((cells (propnet-state-cells state))
-             (cell-values-inbox (propnet-state-cells-inbox state))
-             (propagators-inbox (propnet-state-propagators-inbox state))
-             (propagators-in-flight (propnet-state-propagators-in-flight state)))
-    (match cell-values-inbox
-      ;; Process one new cell value in inbox.
-      (((cell-name . new-cell-value)
-        tail-cell-values-inbox ...)
-       (if ((propnet-value=? propnet)
-            (maybe-assoc-ref (just cells) cell-name)
-            (just new-cell-value))
-           ;; It's the same value. Nothing to do.
-           (loop cells
-                 tail-cell-values-inbox
-                 propagators-inbox
-                 propagators-in-flight)
-           ;; Update the cell and activate propagators.
-           (let ((cells (maybe-assoc-set cells
-                          (cons cell-name
-                                ((propnet-merge-values propnet)
-                                 (maybe-assoc-ref (just cells) cell-name)
-                                 (just new-cell-value))))))
-             (loop cells
-                   tail-cell-values-inbox
+  (let loop ((mcells (propnet-state-cells state))
+             (mcell-values-inbox (propnet-state-cells-inbox state))
+             (mpropagators-inbox (propnet-state-propagators-inbox state))
+             (mpropagators-in-flight (propnet-state-propagators-in-flight state)))
+    (state-let* ((cells mcells)
+                 (cell-values-inbox mcell-values-inbox)
+                 (propagators-inbox mpropagators-inbox)
+                 (propagators-in-flight mpropagators-in-flight))
+      (match cell-values-inbox
+        ;; Process one new cell value in inbox.
+        (((cell-name . new-cell-value)
+          tail-cell-values-inbox ...)
+         (if ((propnet-value=? propnet)
+              (maybe-assoc-ref (just cells) cell-name)
+              (just new-cell-value))
+             ;; It's the same value. Nothing to do.
+             (loop mcells
+                   (state-return tail-cell-values-inbox)
+                   mpropagators-inbox
+                   mpropagators-in-flight)
+             ;; Update the cell and activate propagators.
+             (loop (state-return (maybe-assoc-set cells
+                                   (cons cell-name
+                                         ((propnet-merge-values propnet)
+                                          (maybe-assoc-ref (just cells) cell-name)
+                                          (just new-cell-value)))))
+                   (state-return tail-cell-values-inbox)
                    ;; Enqueue propagators that depend on cell. Union to avoid
                    ;; scheduling the same propagator more than once.
-                   (lset-union eq?
-                               propagators-inbox
-                               (filter (lambda (propagator)
-                                         (rassoc cell-name
-                                                 (propagator-inputs propagator)))
-                                       (propnet-propagators propnet)))
-                   propagators-in-flight))))
-      ;; In order to minimize the number of times a propagator is run, it is
-      ;; important to start scheduling them only after all cells in
-      ;; cell-values-inbox are serviced.
-      (()
-       (match propagators-inbox
-         ;; Poll propagators in flight and update cell values if any of them are
-         ;; done.
-         (()
-          (match propagators-in-flight
-            ;; All propagators are finished. The propnet has stabilized. We are
-            ;; done. Return all cell values.
-            (()
-             (state+status (propnet-state propnet
-                                          cells
-                                          cell-values-inbox
-                                          propagators-in-flight
-                                          propagators-inbox)
-                           'completed))
-            ;; Propagators are still in flight. Check if any of them have
-            ;; completed.
-            (_
-             (let ((finished-propagators
-                    propagators-still-in-flight
-                    (partition-map (match-lambda
-                                     ((_ _ 'completed) #t)
-                                     (_ #f))
-                                   (match-lambda
-                                     ((name state _)
-                                      (cons name state)))
-                                   (map (match-lambda
+                   (state-return
+                    (lset-union eq?
+                                propagators-inbox
+                                (filter (lambda (propagator)
+                                          (rassoc cell-name
+                                                  (propagator-inputs propagator)))
+                                        (propnet-propagators propnet))))
+                   mpropagators-in-flight)))
+        ;; In order to minimize the number of times a propagator is run, it is
+        ;; important to start scheduling them only after all cells in
+        ;; cell-values-inbox are serviced.
+        (()
+         (match propagators-inbox
+           ;; Poll propagators in flight and update cell values if any of them
+           ;; are done.
+           (()
+            (match propagators-in-flight
+              ;; All propagators are finished. The propnet has stabilized. We
+              ;; are done. Return state and completed status.
+              (()
+               (state-return
+                (state+status (propnet-state propnet
+                                             mcells
+                                             mcell-values-inbox
+                                             mpropagators-in-flight
+                                             mpropagators-inbox)
+                              'completed)))
+              ;; Propagators are still in flight. Check if any of them have
+              ;; completed.
+              (_
+               (state-let* ((propagator-states
+                             (state-map (match-lambda
                                           ((name . state)
-                                           (let ((status state ((scheduler-poll scheduler)
-                                                                state)))
-                                             (list name state status))))
-                                        propagators-in-flight))))
-               (match finished-propagators
-                 ;; None of the propagators we checked have completed. Return a
-                 ;; pending state.
-                 (()
-                  (state+status (propnet-state propnet
-                                               cells
-                                               cell-values-inbox
-                                               propagators-still-in-flight
-                                               propagators-inbox)
-                                'pending))
-                 ;; Some of the propagators we checked have completed. Enqueue
-                 ;; their outputs in the cells inbox and loop.
-                 (_
-                  (loop cells
-                        (apply assoc-set
-                               cell-values-inbox
-                               (append-map propagator-state->cell-values
-                                           finished-propagators))
-                        propagators-inbox
-                        propagators-still-in-flight)))))))
-         ;; Schedule propagators in inbox.
-         (_
-          (loop cells
-                cell-values-inbox
-                (list)
-                ;; We don't need to cancel or forget about previous runs of the
-                ;; same propagator because cells only *accumulate* information;
-                ;; they never remove it. Any previous runs of the same
-                ;; propagator will only *add to* the information in the output
-                ;; cells. Previous runs may be closer to completion and taking
-                ;; advantage of their output may allow later stages to start
-                ;; running sooner, thus improving throughput. In our CWL
-                ;; application of propnets, this will never result in the same
-                ;; step being recomputed; so this approach does not come at a
-                ;; higher computational cost.
-                (append (schedule-propagators propagators-inbox cells)
-                        propagators-in-flight))))))))
+                                           (state-let* ((state+status
+                                                         ((scheduler-poll scheduler) state)))
+                                             (state-return
+                                              (list name
+                                                    (state+status-state state+status)
+                                                    (state+status-status state+status))))))
+                                        propagators-in-flight)))
+                 (let ((mfinished-propagators
+                        mpropagators-still-in-flight
+                        (call-with-values (cut partition
+                                               (match-lambda
+                                                 ((_ _ 'completed) #t)
+                                                 (_ #f))
+                                               propagator-states)
+                          (lambda lists
+                            (apply values
+                                   (map (cut state-map
+                                             (match-lambda
+                                               ((name state _)
+                                                (state-return (cons name state))))
+                                             <>)
+                                        lists))))))
+                   (state-let* ((finished-propagators mfinished-propagators))
+                     (match finished-propagators
+                       ;; None of the propagators we checked have completed.
+                       ;; Return a pending state.
+                       (()
+                        (state-return
+                         (state+status (propnet-state propnet
+                                                      mcells
+                                                      mcell-values-inbox
+                                                      mpropagators-still-in-flight
+                                                      mpropagators-inbox)
+                                       'pending)))
+                       ;; Some of the propagators we checked have completed.
+                       ;; Enqueue their outputs in the cells inbox and loop.
+                       (_
+                        (loop mcells
+                              (state-let* ((new-cell-values
+                                            (state-append-map propagator-state->cell-values
+                                                              finished-propagators)))
+                                (state-return (apply assoc-set
+                                                     cell-values-inbox
+                                                     new-cell-values)))
+                              mpropagators-inbox
+                              mpropagators-still-in-flight)))))))))
+           ;; Schedule propagators in inbox.
+           (_
+            (loop mcells
+                  mcell-values-inbox
+                  (state-return (list))
+                  ;; We don't need to cancel or forget about previous runs of
+                  ;; the same propagator because cells only *accumulate*
+                  ;; information; they never remove it. Any previous runs of the
+                  ;; same propagator will only *add to* the information in the
+                  ;; output cells. Previous runs may be closer to completion and
+                  ;; taking advantage of their output may allow later stages to
+                  ;; start running sooner, thus improving throughput. In our CWL
+                  ;; application of propnets, this will never result in the same
+                  ;; step being recomputed; so this approach does not come at a
+                  ;; higher computational cost.
+                  (state-let* ((new-propagators-in-flight
+                                (schedule-propagators propagators-inbox
+                                                      cells)))
+                    (state-return
+                     (append new-propagators-in-flight
+                             propagators-in-flight)))))))))))
 
 (define (capture-propnet-output state)
-  "Return output of propagator network @var{state}."
+  "Return output of propagator network @var{state} as a state-monadic value."
   (propnet-state-cells state))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 3692ede..158edcd 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -249,8 +249,8 @@ propagator."
                              #:key guix-daemon-socket)
   (define (schedule proc inputs scheduler)
     "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object. @var{proc} may either be a @code{<propnet>} object or a
-@code{<scheduler-proc>} object."
+state-monadic job state object. @var{proc} may either be a @code{<propnet>}
+object or a @code{<scheduler-proc>} object."
     (let* ((name (scheduler-proc-name proc))
            (cwl (scheduler-proc-cwl proc))
            (scatter (from-maybe (scheduler-proc-scatter proc)
@@ -261,7 +261,7 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
       (if scatter
           (case scatter-method
             ((dot-product)
-             (apply map
+             (apply state-map
                     (lambda input-elements
                       ;; Recurse with scattered inputs spliced in.
                       (schedule (scheduler-proc name cwl %nothing %nothing)
@@ -303,18 +303,21 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
              ((string=? class "ExpressionTool")
               (error "Workflow class not implemented yet" class))
              ((string=? class "Workflow")
-              (state-return
-               (workflow-state (schedule-propnet (workflow-class->propnet name
-                                                                          cwl
-                                                                          scheduler
-                                                                          batch-system)
-                                                 inputs)
-                               (assoc-ref* cwl "outputs")))))))))
+              (state-let* ((propnet-state
+                            (schedule-propnet (workflow-class->propnet name
+                                                                       cwl
+                                                                       scheduler
+                                                                       batch-system)
+                                              inputs)))
+                (state-return
+                 (workflow-state propnet-state
+                                 (assoc-ref* cwl "outputs"))))))))))
 
   (define (poll state)
     "Return updated state and current status of job @var{state} object as a
-@code{<state+status>} object. The status is one of the symbols @code{pending} or
-@code{completed}."
+state-monadic @code{<state+status>} object. The status is one of the symbols
+@code{pending} or @code{completed}. Within the monadic value, raise an exception
+and exit if job has failed."
     (guard (c ((job-failure? c)
                (let ((script (job-failure-script c)))
                  (user-error
@@ -326,31 +329,34 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
        ;; Return list states as completed only if all state elements in it are
        ;; completed.
        ((list? state)
-        (if (every (lambda (state+status)
-                     (eq? (state+status-status state+status)
-                          'completed))
-                   polled-states)
-            (state+status state 'completed)
-            (state+status state 'pending)))
+        (state-let* ((polled-states (state-map poll state)))
+          (state-return
+           (if (every (lambda (state+status)
+                        (eq? (state+status-status state+status)
+                             'completed))
+                      polled-states)
+               (state+status state 'completed)
+               (state+status state 'pending)))))
        ;; Poll job state. Raise an exception if the job has failed.
        ((command-line-tool-state? state)
-        (state+status state
-                      (case (job-state-status (command-line-tool-state-job-state state)
-                                              batch-system)
-                        ((failed)
-                         (raise-exception (job-failure
-                                           (job-state-script
-                                            (command-line-tool-state-job-state state)))))
-                        (else => identity))))
+        (let ((job-state (command-line-tool-state-job-state state)))
+          (state-let* ((status (job-state-status job-state batch-system)))
+            (state-return
+             (state+status state
+                           (case status
+                             ((failed)
+                              (raise-exception (job-failure
+                                                (job-state-script job-state))))
+                             (else => identity)))))))
        ;; Poll sub-workflow state. We do not need to check the status here since
        ;; job failures only occur at the level of a CommandLineTool.
        ((workflow-state? state)
-        (let ((updated-state+status
-               (poll-propnet (workflow-state-propnet-state state))))
-          (state+status
-           (set-workflow-state-propnet-state state
-                                             (state+status-state updated-state+status))
-           (state+status-status updated-state+status))))
+        (state-let* ((updated-state+status
+                      (poll-propnet (workflow-state-propnet-state state))))
+          (state-return
+           (state+status (set-workflow-state-propnet-state
+                          state (state+status-state updated-state+status))
+                         (state+status-status updated-state+status)))))
        (else
         (assertion-violation state "Invalid state")))))
 
@@ -388,36 +394,42 @@ is the class of the workflow."
     "Return output of completed job @var{state}."
     (cond
      ((workflow-state? state)
-      (filter-outputs "Workflow"
-                      (capture-propnet-output
-                       (workflow-state-propnet-state state))
-                      (workflow-state-formal-outputs state)))
+      (state-let* ((outputs (capture-propnet-output
+                             (workflow-state-propnet-state state))))
+        (state-return
+         (filter-outputs "Workflow"
+                         outputs
+                         (workflow-state-formal-outputs state)))))
      ((list? state)
       ;; Combine outputs from individual state elements.
-      (match (map capture-output state)
-        ((and (head-output _ ...)
-              outputs)
-         (map (match-lambda
-                ((id . value)
-                 (cons id
-                       (map->vector (lambda (output)
-                                      ;; FIXME: Is this the correct way to
-                                      ;; handle missing outputs?
-                                      (or (assoc-ref output id)
-                                          'null))
-                                    outputs))))
-              head-output))))
+      (state-let* ((captured-outputs (state-map capture-output state)))
+        (match captured-outputs
+          ((and (head-output _ ...)
+                outputs)
+           (state-return
+            (map (match-lambda
+                   ((id . value)
+                    (cons id
+                          (map->vector (lambda (output)
+                                         ;; FIXME: Is this the correct way to
+                                         ;; handle missing outputs?
+                                         (or (assoc-ref output id)
+                                             'null))
+                                       outputs))))
+                 head-output))))))
      (else
       ;; Log progress and return captured output.
       (let ((script (job-state-script (command-line-tool-state-job-state state))))
-        (format (current-error-port)
-                "~a completed; logs at ~a and ~a~%"
-                script
-                (script->store-stdout-file script store)
-                (script->store-stderr-file script store))
-        (filter-outputs "CommandLineTool"
-                        (capture-command-line-tool-output script store)
-                        (command-line-tool-state-formal-outputs state))))))
+        (state-return
+         (begin
+           (format (current-error-port)
+                   "~a completed; logs at ~a and ~a~%"
+                   script
+                   (script->store-stdout-file script store)
+                   (script->store-stderr-file script store))
+           (filter-outputs "CommandLineTool"
+                           (capture-command-line-tool-output script store)
+                           (command-line-tool-state-formal-outputs state))))))))
 
   (scheduler schedule poll capture-output))
 
@@ -612,25 +624,27 @@ area need not be shared. @var{store} is the path to the shared ravanan store.
   (let ((scheduler (workflow-scheduler
                     manifest-file channels scratch store batch-system
                     #:guix-daemon-socket guix-daemon-socket)))
-    (let loop ((state (run-with-state
-                       ((scheduler-schedule scheduler)
-                        (scheduler-proc name cwl %nothing %nothing)
-                        inputs
-                        scheduler))))
-      ;; Poll.
-      (let ((state+status ((scheduler-poll scheduler) state)))
-        (if (eq? (state+status-status state+status) 'pending)
-            (begin
-              ;; Pause before looping and polling again so we don't bother the
-              ;; job server too often.
-              (sleep (cond
+    (run-with-state
+     (let loop ((mstate ((scheduler-schedule scheduler)
+                         (scheduler-proc name cwl %nothing %nothing)
+                         inputs
+                         scheduler)))
+       ;; Poll.
+       (state-let* ((state mstate)
+                    (state+status ((scheduler-poll scheduler) state)))
+         (if (eq? (state+status-status state+status)
+                  'pending)
+             (begin
+               ;; Pause before looping and polling again so we don't bother the
+               ;; job server too often.
+               (sleep (cond
                        ;; Single machine jobs are run synchronously. So, there
                        ;; is no need to wait to poll them.
                        ((eq? batch-system 'single-machine)
                         0)
                        ((slurm-api-batch-system? batch-system)
                         %job-poll-interval)))
-              (loop (state+status-state state+status)))
-            ;; Capture outputs.
-            ((scheduler-capture-output scheduler)
-             (state+status-state state+status)))))))
+               (loop (state-return (state+status-state state+status))))
+             ;; Capture outputs.
+             ((scheduler-capture-output scheduler)
+              (state+status-state state+status))))))))