aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ravanan/workflow.scm169
1 files changed, 93 insertions, 76 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 989c711..42fa349 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -259,50 +259,56 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
(scatter-method (from-maybe (scheduler-proc-scatter-method proc)
#f))
(class (assoc-ref* cwl "class")))
- (cond
- (scatter
- (case scatter-method
- ((dot-product)
- (apply vector-map
- (lambda input-elements
- ;; Recurse with scattered inputs spliced in.
- (schedule (scheduler-proc name cwl %nothing %nothing)
- ;; Replace scattered inputs with single
- ;; elements.
- (apply assoc-set
- inputs
- (map cons
- (vector->list scatter)
- input-elements))
- scheduler))
- ;; Extract values of scattered inputs.
- (vector-map->list (cut assoc-ref inputs <>)
- scatter)))
- ((nested-cross-product flat-cross-product)
- (error scatter-method
- "Scatter method not implemented yet"))))
- ((string=? class "CommandLineTool")
- (command-line-tool-state
- (run-command-line-tool name
- manifest
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt)
- (assoc-ref* cwl "outputs")))
- ((string=? class "ExpressionTool")
- (error "Workflow class not implemented yet" class))
- ((string=? class "Workflow")
- (workflow-state (schedule-propnet (workflow-class->propnet name
- cwl
- scheduler
- batch-system)
- inputs)
- (assoc-ref* cwl "outputs"))))))
+ (if scatter
+ (case scatter-method
+ ((dot-product)
+ (apply vector-map
+ (lambda input-elements
+ ;; Recurse with scattered inputs spliced in.
+ (schedule (scheduler-proc name cwl %nothing %nothing)
+ ;; Replace scattered inputs with single
+ ;; elements.
+ (apply assoc-set
+ inputs
+ (map cons
+ (vector->list scatter)
+ input-elements))
+ scheduler))
+ ;; Extract values of scattered inputs.
+ (vector-map->list (cut assoc-ref inputs <>)
+ scatter)))
+ ((nested-cross-product flat-cross-product)
+ (error scatter-method
+ "Scatter method not implemented yet")))
+ (let* ((formal-inputs (assoc-ref* cwl "inputs"))
+ ;; We need to resolve inputs after adding defaults since the
+ ;; default values may contain partially specified File objects.
+ (inputs (resolve-inputs (add-defaults inputs formal-inputs)
+ formal-inputs
+ store)))
+ (cond
+ ((string=? class "CommandLineTool")
+ (command-line-tool-state
+ (run-command-line-tool name
+ manifest
+ cwl
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)
+ (assoc-ref* cwl "outputs")))
+ ((string=? class "ExpressionTool")
+ (error "Workflow class not implemented yet" class))
+ ((string=? class "Workflow")
+ (workflow-state (schedule-propnet (workflow-class->propnet name
+ cwl
+ scheduler
+ batch-system)
+ inputs)
+ (assoc-ref* cwl "outputs"))))))))
(define (poll state)
"Return current status and updated state of job @var{state} object. The status is
@@ -415,6 +421,26 @@ is the class of the workflow."
(scheduler schedule poll capture-output))
+(define (add-defaults inputs formal-inputs)
+ "Add default values from @var{formal-inputs} to @var{inputs}."
+ (vector-filter-map->list (lambda (formal-input)
+ (let* ((id (assoc-ref* formal-input "id"))
+ ;; Try
+ ;; - the input value
+ ;; - the default value
+ ;; - the null value (for optional inputs)
+ (value (or (assoc-ref inputs id)
+ (assoc-ref formal-input "default")
+ 'null))
+ (expected-type (formal-parameter-type
+ (assoc-ref* formal-input "type"))))
+ (unless (match-type value expected-type)
+ (user-error "Type mismatch for input `~a'; expected `~a' but got `~a'"
+ id expected-type (object-type value)))
+ (and (not (eq? value 'null))
+ (cons id value))))
+ formal-inputs))
+
(define (location->path location)
"Convert file @var{location} URI to path."
(if (string-prefix? "/" location)
@@ -564,35 +590,26 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to
authenticate to the slurm API with. @var{slurm-api-endpoint} and
@var{slurm-jwt} are only used when @var{batch-system} is
@code{'slurm-api}."
- (let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store)))
- ;; Ensure required inputs are specified.
- (vector-for-each (lambda (input)
- (let ((input-id (assoc-ref input "id")))
- (unless (or (optional-input? input)
- (assoc input-id inputs))
- (user-error "Required input `~a' not specified"
- input-id))))
- (assoc-ref cwl "inputs"))
- (let ((scheduler (workflow-scheduler
- manifest scratch store batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt)))
- (let loop ((state ((scheduler-schedule scheduler)
- (scheduler-proc name cwl %nothing %nothing)
- inputs
- scheduler)))
- ;; Poll.
- (let ((status state ((scheduler-poll scheduler) state)))
- (if (eq? status 'pending)
- (begin
- ;; Pause before looping and polling again so we don't bother the
- ;; job server too often.
- (sleep (case batch-system
- ;; Single machine jobs are run synchronously. So, there
- ;; is no need to wait to poll them.
- ((single-machine) 0)
- ((slurm-api) %job-poll-interval)))
- (loop state))
- ;; Capture outputs.
- ((scheduler-capture-output scheduler) state)))))))
+ (let ((scheduler (workflow-scheduler
+ manifest scratch store batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)))
+ (let loop ((state ((scheduler-schedule scheduler)
+ (scheduler-proc name cwl %nothing %nothing)
+ inputs
+ scheduler)))
+ ;; Poll.
+ (let ((status state ((scheduler-poll scheduler) state)))
+ (if (eq? status 'pending)
+ (begin
+ ;; Pause before looping and polling again so we don't bother the
+ ;; job server too often.
+ (sleep (case batch-system
+ ;; Single machine jobs are run synchronously. So, there
+ ;; is no need to wait to poll them.
+ ((single-machine) 0)
+ ((slurm-api) %job-poll-interval)))
+ (loop state))
+ ;; Capture outputs.
+ ((scheduler-capture-output scheduler) state))))))