about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--ravanan/workflow.scm437
1 files changed, 232 insertions, 205 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 938ddf0..90c4d4c 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -17,6 +17,7 @@
 ;;; along with ravanan.  If not, see <https://www.gnu.org/licenses/>.
 
 (define-module (ravanan workflow)
+  #:use-module ((rnrs base) #:select (assertion-violation))
   #:use-module ((rnrs conditions) #:select (define-condition-type))
   #:use-module (rnrs exceptions)
   #:use-module (srfi srfi-1)
@@ -60,6 +61,20 @@
   (scatter scheduler-proc-scatter)
   (scatter-method scheduler-proc-scatter-method))
 
+(define-immutable-record-type <command-line-tool-state>
+  (command-line-tool-state job-state formal-outputs)
+  command-line-tool-state?
+  (job-state command-line-tool-state-job-state
+             set-command-line-tool-state-job-state)
+  (formal-outputs command-line-tool-state-formal-outputs))
+
+(define-immutable-record-type <workflow-state>
+  (workflow-state propnet-state formal-outputs)
+  workflow-state?
+  (propnet-state workflow-state-propnet-state
+                 set-workflow-state-propnet-state)
+  (formal-outputs workflow-state-formal-outputs))
+
 (define (value=? maybe-val1 maybe-val2)
   "Return @code{#t} if maybe-monadic values @var{maybe-val1} and
 @var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -174,141 +189,117 @@ propagator."
                                         (assoc-ref output "id")))
                                 (assoc-ref cwl "outputs"))))
 
-(define (workflow->command-line-tool-steps cwl)
-  "Recursively traverse @var{cwl} tree and return a list of
-@code{CommandLineTool} class steps."
-  (define (inherit-defaults step-run step-in)
-    "Augment the @var{step-run} CWL tree with defaults from the containing
-workflow. @var{step-in} is the inputs mapping association list of the step."
-    (assoc-set step-run
-      (cons "inputs"
-            (vector-map (lambda (input)
-                          (let* ((input-id (assoc-ref input "id"))
-                                 (input-source (assoc-ref step-in input-id)))
-                            (maybe-assoc-set input
-                              (cons "default"
-                                    (maybe-assoc-ref (just cwl)
-                                                     "inputs"
-                                                     input-source
-                                                     "default")))))
-                        (assoc-ref* step-run "inputs")))))
-
-  (vector-append-map->list (lambda (step)
-                             (let ((run (inherit-defaults (assoc-ref step "run")
-                                                          (assoc-ref step "in"))))
-                               (if (string=? (assoc-ref run "class")
-                                             "CommandLineTool")
-                                   (list (assoc-set step
-                                           (cons "run" run)))
-                                   (workflow->command-line-tool-steps
+(define* (workflow-class->propnet name cwl scheduler)
+  "Return a propagator network scheduled using @var{scheduler} for @var{cwl}, a
+@code{Workflow} class workflow with @var{name}."
+  (define (normalize-scatter-method scatter-method)
+    (assoc-ref* '(("dotproduct" . dot-product)
+                  ("nested_crossproduct" . nested-cross-product)
+                  ("flat_crossproduct" . flat-cross-product))
+                scatter-method))
+
+  (define (step->propagator step)
+    (let* ((step-id (assoc-ref* step "id"))
+           (step-propagator
+            (command-line-tool->propagator step-id (assoc-ref* step "run"))))
+      (propagator (propagator-name step-propagator)
+                  (let ((proc (propagator-proc step-propagator)))
+                    (scheduler-proc (scheduler-proc-name proc)
                                     (inherit-requirements-and-hints
-                                     run
+                                     (scheduler-proc-cwl proc)
                                      (or (assoc-ref cwl "requirements")
                                          #())
                                      (or (assoc-ref cwl "hints")
                                          #())
-                                     (or (assoc-ref run "requirements")
+                                     (or (assoc-ref step "requirements")
                                          #())
-                                     (or (assoc-ref run "hints")
-                                         #()))))))
-                           (assoc-ref cwl "steps")))
-
-(define* (workflow-class->propagators name cwl)
-  "Return a list of propagators for @var{cwl}, a @code{Workflow} class
-workflow with @var{name}."
-  (define (prefix-name prefix name)
-    "Prefix @var{name} with @var{prefix} so as to keep subworkflows in
-their own namespaces."
-    (string-append prefix "/" name))
-
-  (map (lambda (step)
-         (let* ((step-id (assoc-ref step "id"))
-                (step-propagator
-                 (command-line-tool->propagator step-id
-                                                (assoc-ref step "run"))))
-           (propagator (prefix-name step-id
-                                    (propagator-name step-propagator))
-                       ;; Augment proc with scatter and scatter method.
-                       (let ((proc (propagator-proc step-propagator)))
-                         (scheduler-proc
-                          (scheduler-proc-name proc)
-                          (scheduler-proc-cwl proc)
-                          (maybe-assoc-ref (just step) "scatter")
-                          (maybe-let* ((scatter-method
-                                        (maybe-assoc-ref (just step) "scatterMethod")))
-                            (just (assoc-ref* '(("dotproduct" . dot-product)
-                                                ("nested_crossproduct" . nested-cross-product)
-                                                ("flat_crossproduct" . flat-cross-product))
-                                              scatter-method)))))
-                       (assoc-ref step "in")
-                       (propagator-optional-inputs step-propagator)
-                       (vector-map->list (lambda (output-name)
-                                           (cons output-name
-                                                 (prefix-name step-id output-name)))
-                                         (assoc-ref step "out")))))
-       (workflow->command-line-tool-steps cwl)))
-
-(define (workflow->propagators name cwl)
-  "Return a list of propagators for @var{cwl}, a workflow with
-@var{name}."
-  (let ((class (assoc-ref cwl "class")))
-    ((cond
-      ((string=? class "CommandLineTool")
-       (compose list command-line-tool->propagator))
-      ((string=? class "ExpressionTool")
-       (error "Workflow class not implemented yet" class))
-      ((string=? class "Workflow")
-       (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements")))
-         (check-requirements requirements %supported-requirements))
-       (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints")))
-         (check-requirements hints %supported-requirements #t))
-       workflow-class->propagators)
-      (else
-       (error "Invalid workflow class" class)))
-     name cwl)))
+                                     (or (assoc-ref step "hints")
+                                         #()))
+                                    (maybe-assoc-ref (just step) "scatter")
+                                    (maybe-bind (maybe-assoc-ref (just step) "scatterMethod")
+                                                (compose just normalize-scatter-method))))
+                  (map (match-lambda
+                         ((input-id . _)
+                          (cons input-id
+                                (json-ref step "in" input-id))))
+                       (propagator-inputs step-propagator))
+                  (propagator-optional-inputs step-propagator)
+                  (filter-map (match-lambda
+                                ((output . cell)
+                                 (and (vector-member output
+                                                     (assoc-ref* step "out"))
+                                      (cons output
+                                            (string-append step-id "/" cell)))))
+                              (propagator-outputs step-propagator)))))
+
+  (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements")))
+    (check-requirements requirements
+                        %supported-requirements))
+  (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints")))
+    (check-requirements hints
+                        %supported-requirements
+                        #t))
+  (propnet (vector-map->list step->propagator
+                             (assoc-ref* cwl "steps"))
+           value=?
+           merge-values
+           scheduler))
 
 (define* (workflow-scheduler manifest scratch store batch-system
                              #:key guix-daemon-socket
                              slurm-api-endpoint slurm-jwt)
-  (define (schedule proc inputs)
+  (define (schedule proc inputs scheduler)
     "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object."
-    (let ((name (scheduler-proc-name proc))
-          (cwl (scheduler-proc-cwl proc))
-          (scatter (from-maybe (scheduler-proc-scatter proc)
-                               #f))
-          (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
-                                      #f)))
-      (if scatter
-          (case scatter-method
-            ((dot-product)
-             (apply vector-map
-                    (lambda input-elements
-                      ;; Recurse with scattered inputs spliced in.
-                      (schedule (scheduler-proc name cwl %nothing %nothing)
-                                ;; Replace scattered inputs with single
-                                ;; elements.
-                                (apply assoc-set
-                                       inputs
-                                       (map cons
-                                            (vector->list scatter)
-                                            input-elements))))
-                    ;; Extract values of scattered inputs.
-                    (vector-map->list (cut assoc-ref inputs <>)
-                                      scatter)))
-            ((nested-cross-product flat-cross-product)
-             (error scatter-method
-                    "Scatter method not implemented yet")))
-          (run-command-line-tool name
-                                 manifest
-                                 cwl
-                                 inputs
-                                 scratch
-                                 store
-                                 batch-system
-                                 #:guix-daemon-socket guix-daemon-socket
-                                 #:slurm-api-endpoint slurm-api-endpoint
-                                 #:slurm-jwt slurm-jwt))))
+job state object. @var{proc} may either be a @code{<propnet>} object or a
+@code{<scheduler-proc>} object."
+    (let* ((name (scheduler-proc-name proc))
+           (cwl (scheduler-proc-cwl proc))
+           (scatter (from-maybe (scheduler-proc-scatter proc)
+                                #f))
+           (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
+                                       #f))
+           (class (assoc-ref* cwl "class")))
+      (cond
+       (scatter
+        (case scatter-method
+          ((dot-product)
+           (apply vector-map
+                  (lambda input-elements
+                    ;; Recurse with scattered inputs spliced in.
+                    (schedule (scheduler-proc name cwl %nothing %nothing)
+                              ;; Replace scattered inputs with single
+                              ;; elements.
+                              (apply assoc-set
+                                     inputs
+                                     (map cons
+                                          (vector->list scatter)
+                                          input-elements))
+                              scheduler))
+                  ;; Extract values of scattered inputs.
+                  (vector-map->list (cut assoc-ref inputs <>)
+                                    scatter)))
+          ((nested-cross-product flat-cross-product)
+           (error scatter-method
+                  "Scatter method not implemented yet"))))
+       ((string=? class "CommandLineTool")
+        (command-line-tool-state
+         (run-command-line-tool name
+                                manifest
+                                cwl
+                                inputs
+                                scratch
+                                store
+                                batch-system
+                                #:guix-daemon-socket guix-daemon-socket
+                                #:slurm-api-endpoint slurm-api-endpoint
+                                #:slurm-jwt slurm-jwt)
+         (assoc-ref* cwl "outputs")))
+       ((string=? class "ExpressionTool")
+        (error "Workflow class not implemented yet" class))
+       ((string=? class "Workflow")
+        (workflow-state (schedule-propnet (workflow-class->propnet name cwl scheduler)
+                                          inputs)
+                        (assoc-ref* cwl "outputs"))))))
 
   (define (poll state)
     "Return current status and updated state of job @var{state} object. The status is
@@ -321,40 +312,103 @@ exit if job has failed."
                   script
                   (script->store-stdout-file script store)
                   (script->store-stderr-file script store)))))
-      (let ((status state (job-state-status state
-                                            #:slurm-api-endpoint slurm-api-endpoint
-                                            #:slurm-jwt slurm-jwt)))
-        (values (case status
-                  ((failed)
-                   (raise-exception (job-failure (job-state-script state))))
-                  (else => identity))
-                state))))
+      (cond
+       ;; Return vector states as completed only if all state elements in it are
+       ;; completed.
+       ((vector? state)
+        (let ((status state (vector-mapn poll state)))
+          (values (if (vector-every (cut eq? <> 'completed)
+                                    status)
+                      'completed
+                      'pending)
+                  state)))
+       ;; Poll job state. Raise an exception if the job has failed.
+       ((command-line-tool-state? state)
+        (let ((status updated-job-state
+                      (job-state-status (command-line-tool-state-job-state state)
+                                        #:slurm-api-endpoint slurm-api-endpoint
+                                        #:slurm-jwt slurm-jwt)))
+          (values (case status
+                    ((failed)
+                     (raise-exception (job-failure
+                                       (job-state-script
+                                        (command-line-tool-state-job-state state)))))
+                    (else => identity))
+                  (set-command-line-tool-state-job-state state updated-job-state))))
+       ;; Poll sub-workflow state. We do not need to check the status here since
+       ;; job failures only occur at the level of a CommandLineTool.
+       ((workflow-state? state)
+        (let ((status updated-propnet-state
+                      (poll-propnet (workflow-state-propnet-state state))))
+          (values status
+                  (set-workflow-state-propnet-state state updated-propnet-state))))
+       (else
+        (assertion-violation state "Invalid state")))))
+
+  (define (find-output class outputs formal-output)
+    "Find @var{formal-output} among @var{outputs}. @var{class} is the class of the
+workflow."
+    (let ((output-id (assoc-ref formal-output "id")))
+      (cond
+       ;; The output is present; cons and return it.
+       ((cond
+         ((string=? class "CommandLineTool")
+          (assoc-ref outputs output-id))
+         ((string=? class "ExpressionTool")
+          (error "Workflow class not implemented yet"
+                 class))
+         ((string=? class "Workflow")
+          (assoc-ref outputs
+                     (assoc-ref* formal-output "outputSource"))))
+        => (cut cons output-id <>))
+       ;; The output is absent; check if a null type is acceptable.
+       ((match-type 'null
+                    (formal-parameter-type (assoc-ref* formal-output "type")))
+        #f)
+       ;; Else, error out.
+       (else
+        (error "output not found" output-id)))))
+
+  (define (filter-outputs class outputs formal-outputs)
+    "Filter @var{outputs} to only have outputs from @var{formal-outputs}. @var{class}
+is the class of the workflow."
+    (vector-filter-map->list (cut find-output class outputs <>)
+                             formal-outputs))
 
   (define (capture-output state)
     "Return output of completed job @var{state}."
-    (if (vector? state)
-        ;; Combine outputs from individual state elements.
-        (match (vector-map capture-output state)
-          ((and #(head-output _ ...)
-                outputs)
-           (map (match-lambda
-                  ((id . value)
-                   (cons id
-                         (vector-map (lambda (output)
-                                       ;; FIXME: Is this the correct way to
-                                       ;; handle missing outputs?
-                                       (or (assoc-ref output id)
-                                           'null))
-                                     outputs))))
-                head-output)))
-        ;; Log progress and return captured output.
-        (let ((script (job-state-script state)))
-          (format (current-error-port)
-                  "~a completed; logs at ~a and ~a~%"
-                  script
-                  (script->store-stdout-file script store)
-                  (script->store-stderr-file script store))
-          (capture-command-line-tool-output script store))))
+    (cond
+     ((workflow-state? state)
+      (filter-outputs "Workflow"
+                      (capture-propnet-output
+                       (workflow-state-propnet-state state))
+                      (workflow-state-formal-outputs state)))
+     ((vector? state)
+      ;; Combine outputs from individual state elements.
+      (match (vector-map capture-output state)
+        ((and #(head-output _ ...)
+              outputs)
+         (map (match-lambda
+                ((id . value)
+                 (cons id
+                       (vector-map (lambda (output)
+                                     ;; FIXME: Is this the correct way to
+                                     ;; handle missing outputs?
+                                     (or (assoc-ref output id)
+                                         'null))
+                                   outputs))))
+              head-output))))
+     (else
+      ;; Log progress and return captured output.
+      (let ((script (job-state-script (command-line-tool-state-job-state state))))
+        (format (current-error-port)
+                "~a completed; logs at ~a and ~a~%"
+                script
+                (script->store-stdout-file script store)
+                (script->store-stderr-file script store))
+        (filter-outputs "CommandLineTool"
+                        (capture-command-line-tool-output script store)
+                        (command-line-tool-state-formal-outputs state))))))
 
   (scheduler schedule poll capture-output))
 
@@ -507,29 +561,6 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to
 authenticate to the slurm API with. @var{slurm-api-endpoint} and
 @var{slurm-jwt} are only used when @var{batch-system} is
 @code{'slurm-api}."
-  (define (capture-output cell-values output)
-    (let ((output-id (assoc-ref output "id")))
-      (cond
-       ;; The output is present; cons and return it.
-       ((let ((class (assoc-ref* cwl "class")))
-          (cond
-           ((string=? class "CommandLineTool")
-            (assoc-ref cell-values output-id))
-           ((string=? class "ExpressionTool")
-            (error "Workflow class not implemented yet"
-                   class))
-           ((string=? class "Workflow")
-            (assoc-ref cell-values
-                       (assoc-ref* output "outputSource")))))
-        => (cut cons output-id <>))
-       ;; The output is absent; check if a null type is acceptable.
-       ((match-type 'null
-                    (formal-parameter-type (assoc-ref* output "type")))
-        #f)
-       ;; Else, error out.
-       (else
-        (error "output not found" output-id)))))
-
   (let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store)))
     ;; Ensure required inputs are specified.
     (vector-for-each (lambda (input)
@@ -539,30 +570,26 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
                            (user-error "Required input `~a' not specified"
                                        input-id))))
                      (assoc-ref cwl "inputs"))
-    (let loop ((state (schedule-propnet
-                       (propnet (workflow->propagators name cwl)
-                                value=?
-                                merge-values
-                                (workflow-scheduler
-                                 manifest scratch store batch-system
-                                 #:guix-daemon-socket guix-daemon-socket
-                                 #:slurm-api-endpoint slurm-api-endpoint
-                                 #:slurm-jwt slurm-jwt))
-                       inputs)))
-      ;; Poll.
-      (let ((status state (poll-propnet state)))
-        (if (eq? status 'pending)
-            (begin
-              ;; Pause before looping and polling again so we don't bother the job
-              ;; server too often.
-              (sleep (case batch-system
-                       ;; Single machine jobs are run synchronously. So, there is
-                       ;; no need to wait to poll them.
-                       ((single-machine) 0)
-                       ((slurm-api) %job-poll-interval)))
-              (loop state))
-            ;; Capture outputs.
-            (vector-filter-map->list (cute capture-output
-                                           (capture-propnet-output state)
-                                           <>)
-                                     (assoc-ref* cwl "outputs")))))))
+    (let ((scheduler (workflow-scheduler
+                      manifest scratch store batch-system
+                      #:guix-daemon-socket guix-daemon-socket
+                      #:slurm-api-endpoint slurm-api-endpoint
+                      #:slurm-jwt slurm-jwt)))
+      (let loop ((state ((scheduler-schedule scheduler)
+                         (scheduler-proc name cwl %nothing %nothing)
+                         inputs
+                         scheduler)))
+        ;; Poll.
+        (let ((status state ((scheduler-poll scheduler) state)))
+          (if (eq? status 'pending)
+              (begin
+                ;; Pause before looping and polling again so we don't bother the
+                ;; job server too often.
+                (sleep (case batch-system
+                         ;; Single machine jobs are run synchronously. So, there
+                         ;; is no need to wait to poll them.
+                         ((single-machine) 0)
+                         ((slurm-api) %job-poll-interval)))
+                (loop state))
+              ;; Capture outputs.
+              ((scheduler-capture-output scheduler) state)))))))