aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ravanan/workflow.scm437
1 files changed, 232 insertions, 205 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 938ddf0..90c4d4c 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -17,6 +17,7 @@
;;; along with ravanan. If not, see <https://www.gnu.org/licenses/>.
(define-module (ravanan workflow)
+ #:use-module ((rnrs base) #:select (assertion-violation))
#:use-module ((rnrs conditions) #:select (define-condition-type))
#:use-module (rnrs exceptions)
#:use-module (srfi srfi-1)
@@ -60,6 +61,20 @@
(scatter scheduler-proc-scatter)
(scatter-method scheduler-proc-scatter-method))
+(define-immutable-record-type <command-line-tool-state>
+ (command-line-tool-state job-state formal-outputs)
+ command-line-tool-state?
+ (job-state command-line-tool-state-job-state
+ set-command-line-tool-state-job-state)
+ (formal-outputs command-line-tool-state-formal-outputs))
+
+(define-immutable-record-type <workflow-state>
+ (workflow-state propnet-state formal-outputs)
+ workflow-state?
+ (propnet-state workflow-state-propnet-state
+ set-workflow-state-propnet-state)
+ (formal-outputs workflow-state-formal-outputs))
+
(define (value=? maybe-val1 maybe-val2)
"Return @code{#t} if maybe-monadic values @var{maybe-val1} and
@var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -174,141 +189,117 @@ propagator."
(assoc-ref output "id")))
(assoc-ref cwl "outputs"))))
-(define (workflow->command-line-tool-steps cwl)
- "Recursively traverse @var{cwl} tree and return a list of
-@code{CommandLineTool} class steps."
- (define (inherit-defaults step-run step-in)
- "Augment the @var{step-run} CWL tree with defaults from the containing
-workflow. @var{step-in} is the inputs mapping association list of the step."
- (assoc-set step-run
- (cons "inputs"
- (vector-map (lambda (input)
- (let* ((input-id (assoc-ref input "id"))
- (input-source (assoc-ref step-in input-id)))
- (maybe-assoc-set input
- (cons "default"
- (maybe-assoc-ref (just cwl)
- "inputs"
- input-source
- "default")))))
- (assoc-ref* step-run "inputs")))))
-
- (vector-append-map->list (lambda (step)
- (let ((run (inherit-defaults (assoc-ref step "run")
- (assoc-ref step "in"))))
- (if (string=? (assoc-ref run "class")
- "CommandLineTool")
- (list (assoc-set step
- (cons "run" run)))
- (workflow->command-line-tool-steps
+(define* (workflow-class->propnet name cwl scheduler)
+ "Return a propagator network scheduled using @var{scheduler} for @var{cwl}, a
+@code{Workflow} class workflow with @var{name}."
+ (define (normalize-scatter-method scatter-method)
+ (assoc-ref* '(("dotproduct" . dot-product)
+ ("nested_crossproduct" . nested-cross-product)
+ ("flat_crossproduct" . flat-cross-product))
+ scatter-method))
+
+ (define (step->propagator step)
+ (let* ((step-id (assoc-ref* step "id"))
+ (step-propagator
+ (command-line-tool->propagator step-id (assoc-ref* step "run"))))
+ (propagator (propagator-name step-propagator)
+ (let ((proc (propagator-proc step-propagator)))
+ (scheduler-proc (scheduler-proc-name proc)
(inherit-requirements-and-hints
- run
+ (scheduler-proc-cwl proc)
(or (assoc-ref cwl "requirements")
#())
(or (assoc-ref cwl "hints")
#())
- (or (assoc-ref run "requirements")
+ (or (assoc-ref step "requirements")
#())
- (or (assoc-ref run "hints")
- #()))))))
- (assoc-ref cwl "steps")))
-
-(define* (workflow-class->propagators name cwl)
- "Return a list of propagators for @var{cwl}, a @code{Workflow} class
-workflow with @var{name}."
- (define (prefix-name prefix name)
- "Prefix @var{name} with @var{prefix} so as to keep subworkflows in
-their own namespaces."
- (string-append prefix "/" name))
-
- (map (lambda (step)
- (let* ((step-id (assoc-ref step "id"))
- (step-propagator
- (command-line-tool->propagator step-id
- (assoc-ref step "run"))))
- (propagator (prefix-name step-id
- (propagator-name step-propagator))
- ;; Augment proc with scatter and scatter method.
- (let ((proc (propagator-proc step-propagator)))
- (scheduler-proc
- (scheduler-proc-name proc)
- (scheduler-proc-cwl proc)
- (maybe-assoc-ref (just step) "scatter")
- (maybe-let* ((scatter-method
- (maybe-assoc-ref (just step) "scatterMethod")))
- (just (assoc-ref* '(("dotproduct" . dot-product)
- ("nested_crossproduct" . nested-cross-product)
- ("flat_crossproduct" . flat-cross-product))
- scatter-method)))))
- (assoc-ref step "in")
- (propagator-optional-inputs step-propagator)
- (vector-map->list (lambda (output-name)
- (cons output-name
- (prefix-name step-id output-name)))
- (assoc-ref step "out")))))
- (workflow->command-line-tool-steps cwl)))
-
-(define (workflow->propagators name cwl)
- "Return a list of propagators for @var{cwl}, a workflow with
-@var{name}."
- (let ((class (assoc-ref cwl "class")))
- ((cond
- ((string=? class "CommandLineTool")
- (compose list command-line-tool->propagator))
- ((string=? class "ExpressionTool")
- (error "Workflow class not implemented yet" class))
- ((string=? class "Workflow")
- (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements")))
- (check-requirements requirements %supported-requirements))
- (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints")))
- (check-requirements hints %supported-requirements #t))
- workflow-class->propagators)
- (else
- (error "Invalid workflow class" class)))
- name cwl)))
+ (or (assoc-ref step "hints")
+ #()))
+ (maybe-assoc-ref (just step) "scatter")
+ (maybe-bind (maybe-assoc-ref (just step) "scatterMethod")
+ (compose just normalize-scatter-method))))
+ (map (match-lambda
+ ((input-id . _)
+ (cons input-id
+ (json-ref step "in" input-id))))
+ (propagator-inputs step-propagator))
+ (propagator-optional-inputs step-propagator)
+ (filter-map (match-lambda
+ ((output . cell)
+ (and (vector-member output
+ (assoc-ref* step "out"))
+ (cons output
+ (string-append step-id "/" cell)))))
+ (propagator-outputs step-propagator)))))
+
+ (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements")))
+ (check-requirements requirements
+ %supported-requirements))
+ (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints")))
+ (check-requirements hints
+ %supported-requirements
+ #t))
+ (propnet (vector-map->list step->propagator
+ (assoc-ref* cwl "steps"))
+ value=?
+ merge-values
+ scheduler))
(define* (workflow-scheduler manifest scratch store batch-system
#:key guix-daemon-socket
slurm-api-endpoint slurm-jwt)
- (define (schedule proc inputs)
+ (define (schedule proc inputs scheduler)
"Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object."
- (let ((name (scheduler-proc-name proc))
- (cwl (scheduler-proc-cwl proc))
- (scatter (from-maybe (scheduler-proc-scatter proc)
- #f))
- (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
- #f)))
- (if scatter
- (case scatter-method
- ((dot-product)
- (apply vector-map
- (lambda input-elements
- ;; Recurse with scattered inputs spliced in.
- (schedule (scheduler-proc name cwl %nothing %nothing)
- ;; Replace scattered inputs with single
- ;; elements.
- (apply assoc-set
- inputs
- (map cons
- (vector->list scatter)
- input-elements))))
- ;; Extract values of scattered inputs.
- (vector-map->list (cut assoc-ref inputs <>)
- scatter)))
- ((nested-cross-product flat-cross-product)
- (error scatter-method
- "Scatter method not implemented yet")))
- (run-command-line-tool name
- manifest
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt))))
+job state object. @var{proc} may either be a @code{<propnet>} object or a
+@code{<scheduler-proc>} object."
+ (let* ((name (scheduler-proc-name proc))
+ (cwl (scheduler-proc-cwl proc))
+ (scatter (from-maybe (scheduler-proc-scatter proc)
+ #f))
+ (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
+ #f))
+ (class (assoc-ref* cwl "class")))
+ (cond
+ (scatter
+ (case scatter-method
+ ((dot-product)
+ (apply vector-map
+ (lambda input-elements
+ ;; Recurse with scattered inputs spliced in.
+ (schedule (scheduler-proc name cwl %nothing %nothing)
+ ;; Replace scattered inputs with single
+ ;; elements.
+ (apply assoc-set
+ inputs
+ (map cons
+ (vector->list scatter)
+ input-elements))
+ scheduler))
+ ;; Extract values of scattered inputs.
+ (vector-map->list (cut assoc-ref inputs <>)
+ scatter)))
+ ((nested-cross-product flat-cross-product)
+ (error scatter-method
+ "Scatter method not implemented yet"))))
+ ((string=? class "CommandLineTool")
+ (command-line-tool-state
+ (run-command-line-tool name
+ manifest
+ cwl
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)
+ (assoc-ref* cwl "outputs")))
+ ((string=? class "ExpressionTool")
+ (error "Workflow class not implemented yet" class))
+ ((string=? class "Workflow")
+ (workflow-state (schedule-propnet (workflow-class->propnet name cwl scheduler)
+ inputs)
+ (assoc-ref* cwl "outputs"))))))
(define (poll state)
"Return current status and updated state of job @var{state} object. The status is
@@ -321,40 +312,103 @@ exit if job has failed."
script
(script->store-stdout-file script store)
(script->store-stderr-file script store)))))
- (let ((status state (job-state-status state
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt)))
- (values (case status
- ((failed)
- (raise-exception (job-failure (job-state-script state))))
- (else => identity))
- state))))
+ (cond
+ ;; Return vector states as completed only if all state elements in it are
+ ;; completed.
+ ((vector? state)
+ (let ((status state (vector-mapn poll state)))
+ (values (if (vector-every (cut eq? <> 'completed)
+ status)
+ 'completed
+ 'pending)
+ state)))
+ ;; Poll job state. Raise an exception if the job has failed.
+ ((command-line-tool-state? state)
+ (let ((status updated-job-state
+ (job-state-status (command-line-tool-state-job-state state)
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)))
+ (values (case status
+ ((failed)
+ (raise-exception (job-failure
+ (job-state-script
+ (command-line-tool-state-job-state state)))))
+ (else => identity))
+ (set-command-line-tool-state-job-state state updated-job-state))))
+ ;; Poll sub-workflow state. We do not need to check the status here since
+ ;; job failures only occur at the level of a CommandLineTool.
+ ((workflow-state? state)
+ (let ((status updated-propnet-state
+ (poll-propnet (workflow-state-propnet-state state))))
+ (values status
+ (set-workflow-state-propnet-state state updated-propnet-state))))
+ (else
+ (assertion-violation state "Invalid state")))))
+
+ (define (find-output class outputs formal-output)
+ "Find @var{formal-output} among @var{outputs}. @var{class} is the class of the
+workflow."
+ (let ((output-id (assoc-ref formal-output "id")))
+ (cond
+ ;; The output is present; cons and return it.
+ ((cond
+ ((string=? class "CommandLineTool")
+ (assoc-ref outputs output-id))
+ ((string=? class "ExpressionTool")
+ (error "Workflow class not implemented yet"
+ class))
+ ((string=? class "Workflow")
+ (assoc-ref outputs
+ (assoc-ref* formal-output "outputSource"))))
+ => (cut cons output-id <>))
+ ;; The output is absent; check if a null type is acceptable.
+ ((match-type 'null
+ (formal-parameter-type (assoc-ref* formal-output "type")))
+ #f)
+ ;; Else, error out.
+ (else
+ (error "output not found" output-id)))))
+
+ (define (filter-outputs class outputs formal-outputs)
+ "Filter @var{outputs} to only have outputs from @var{formal-outputs}. @var{class}
+is the class of the workflow."
+ (vector-filter-map->list (cut find-output class outputs <>)
+ formal-outputs))
(define (capture-output state)
"Return output of completed job @var{state}."
- (if (vector? state)
- ;; Combine outputs from individual state elements.
- (match (vector-map capture-output state)
- ((and #(head-output _ ...)
- outputs)
- (map (match-lambda
- ((id . value)
- (cons id
- (vector-map (lambda (output)
- ;; FIXME: Is this the correct way to
- ;; handle missing outputs?
- (or (assoc-ref output id)
- 'null))
- outputs))))
- head-output)))
- ;; Log progress and return captured output.
- (let ((script (job-state-script state)))
- (format (current-error-port)
- "~a completed; logs at ~a and ~a~%"
- script
- (script->store-stdout-file script store)
- (script->store-stderr-file script store))
- (capture-command-line-tool-output script store))))
+ (cond
+ ((workflow-state? state)
+ (filter-outputs "Workflow"
+ (capture-propnet-output
+ (workflow-state-propnet-state state))
+ (workflow-state-formal-outputs state)))
+ ((vector? state)
+ ;; Combine outputs from individual state elements.
+ (match (vector-map capture-output state)
+ ((and #(head-output _ ...)
+ outputs)
+ (map (match-lambda
+ ((id . value)
+ (cons id
+ (vector-map (lambda (output)
+ ;; FIXME: Is this the correct way to
+ ;; handle missing outputs?
+ (or (assoc-ref output id)
+ 'null))
+ outputs))))
+ head-output))))
+ (else
+ ;; Log progress and return captured output.
+ (let ((script (job-state-script (command-line-tool-state-job-state state))))
+ (format (current-error-port)
+ "~a completed; logs at ~a and ~a~%"
+ script
+ (script->store-stdout-file script store)
+ (script->store-stderr-file script store))
+ (filter-outputs "CommandLineTool"
+ (capture-command-line-tool-output script store)
+ (command-line-tool-state-formal-outputs state))))))
(scheduler schedule poll capture-output))
@@ -507,29 +561,6 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to
authenticate to the slurm API with. @var{slurm-api-endpoint} and
@var{slurm-jwt} are only used when @var{batch-system} is
@code{'slurm-api}."
- (define (capture-output cell-values output)
- (let ((output-id (assoc-ref output "id")))
- (cond
- ;; The output is present; cons and return it.
- ((let ((class (assoc-ref* cwl "class")))
- (cond
- ((string=? class "CommandLineTool")
- (assoc-ref cell-values output-id))
- ((string=? class "ExpressionTool")
- (error "Workflow class not implemented yet"
- class))
- ((string=? class "Workflow")
- (assoc-ref cell-values
- (assoc-ref* output "outputSource")))))
- => (cut cons output-id <>))
- ;; The output is absent; check if a null type is acceptable.
- ((match-type 'null
- (formal-parameter-type (assoc-ref* output "type")))
- #f)
- ;; Else, error out.
- (else
- (error "output not found" output-id)))))
-
(let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store)))
;; Ensure required inputs are specified.
(vector-for-each (lambda (input)
@@ -539,30 +570,26 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
(user-error "Required input `~a' not specified"
input-id))))
(assoc-ref cwl "inputs"))
- (let loop ((state (schedule-propnet
- (propnet (workflow->propagators name cwl)
- value=?
- merge-values
- (workflow-scheduler
- manifest scratch store batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt))
- inputs)))
- ;; Poll.
- (let ((status state (poll-propnet state)))
- (if (eq? status 'pending)
- (begin
- ;; Pause before looping and polling again so we don't bother the job
- ;; server too often.
- (sleep (case batch-system
- ;; Single machine jobs are run synchronously. So, there is
- ;; no need to wait to poll them.
- ((single-machine) 0)
- ((slurm-api) %job-poll-interval)))
- (loop state))
- ;; Capture outputs.
- (vector-filter-map->list (cute capture-output
- (capture-propnet-output state)
- <>)
- (assoc-ref* cwl "outputs")))))))
+ (let ((scheduler (workflow-scheduler
+ manifest scratch store batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)))
+ (let loop ((state ((scheduler-schedule scheduler)
+ (scheduler-proc name cwl %nothing %nothing)
+ inputs
+ scheduler)))
+ ;; Poll.
+ (let ((status state ((scheduler-poll scheduler) state)))
+ (if (eq? status 'pending)
+ (begin
+ ;; Pause before looping and polling again so we don't bother the
+ ;; job server too often.
+ (sleep (case batch-system
+ ;; Single machine jobs are run synchronously. So, there
+ ;; is no need to wait to poll them.
+ ((single-machine) 0)
+ ((slurm-api) %job-poll-interval)))
+ (loop state))
+ ;; Capture outputs.
+ ((scheduler-capture-output scheduler) state)))))))