From bd8c6f86014d49eb60ffa2df6d5eb99d988cbf25 Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Wed, 2 Oct 2024 12:52:17 +0100 Subject: workflow: Add default inputs before scheduling. We add default input values and resolve them before scheduling. Now that we do that, we no longer have to resolve inputs or check for required inputs in run-workflow. * ravanan/workflow.scm (add-defaults): New function. (workflow-scheduler)[schedule]: Add default inputs and resolve them before scheduling. (run-workflow): Do not resolve inputs or check for required inputs. --- ravanan/workflow.scm | 169 ++++++++++++++++++++++++++++----------------------- 1 file changed, 93 insertions(+), 76 deletions(-) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 989c711..42fa349 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -259,50 +259,56 @@ job state object. @var{proc} may either be a @code{} object or a (scatter-method (from-maybe (scheduler-proc-scatter-method proc) #f)) (class (assoc-ref* cwl "class"))) - (cond - (scatter - (case scatter-method - ((dot-product) - (apply vector-map - (lambda input-elements - ;; Recurse with scattered inputs spliced in. - (schedule (scheduler-proc name cwl %nothing %nothing) - ;; Replace scattered inputs with single - ;; elements. - (apply assoc-set - inputs - (map cons - (vector->list scatter) - input-elements)) - scheduler)) - ;; Extract values of scattered inputs. - (vector-map->list (cut assoc-ref inputs <>) - scatter))) - ((nested-cross-product flat-cross-product) - (error scatter-method - "Scatter method not implemented yet")))) - ((string=? class "CommandLineTool") - (command-line-tool-state - (run-command-line-tool name - manifest - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt) - (assoc-ref* cwl "outputs"))) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" class)) - ((string=? class "Workflow") - (workflow-state (schedule-propnet (workflow-class->propnet name - cwl - scheduler - batch-system) - inputs) - (assoc-ref* cwl "outputs")))))) + (if scatter + (case scatter-method + ((dot-product) + (apply vector-map + (lambda input-elements + ;; Recurse with scattered inputs spliced in. + (schedule (scheduler-proc name cwl %nothing %nothing) + ;; Replace scattered inputs with single + ;; elements. + (apply assoc-set + inputs + (map cons + (vector->list scatter) + input-elements)) + scheduler)) + ;; Extract values of scattered inputs. + (vector-map->list (cut assoc-ref inputs <>) + scatter))) + ((nested-cross-product flat-cross-product) + (error scatter-method + "Scatter method not implemented yet"))) + (let* ((formal-inputs (assoc-ref* cwl "inputs")) + ;; We need to resolve inputs after adding defaults since the + ;; default values may contain partially specified File objects. + (inputs (resolve-inputs (add-defaults inputs formal-inputs) + formal-inputs + store))) + (cond + ((string=? class "CommandLineTool") + (command-line-tool-state + (run-command-line-tool name + manifest + cwl + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt) + (assoc-ref* cwl "outputs"))) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" class)) + ((string=? class "Workflow") + (workflow-state (schedule-propnet (workflow-class->propnet name + cwl + scheduler + batch-system) + inputs) + (assoc-ref* cwl "outputs")))))))) (define (poll state) "Return current status and updated state of job @var{state} object. The status is @@ -415,6 +421,26 @@ is the class of the workflow." (scheduler schedule poll capture-output)) +(define (add-defaults inputs formal-inputs) + "Add default values from @var{formal-inputs} to @var{inputs}." + (vector-filter-map->list (lambda (formal-input) + (let* ((id (assoc-ref* formal-input "id")) + ;; Try + ;; - the input value + ;; - the default value + ;; - the null value (for optional inputs) + (value (or (assoc-ref inputs id) + (assoc-ref formal-input "default") + 'null)) + (expected-type (formal-parameter-type + (assoc-ref* formal-input "type")))) + (unless (match-type value expected-type) + (user-error "Type mismatch for input `~a'; expected `~a' but got `~a'" + id expected-type (object-type value))) + (and (not (eq? value 'null)) + (cons id value)))) + formal-inputs)) + (define (location->path location) "Convert file @var{location} URI to path." (if (string-prefix? "/" location) @@ -564,35 +590,26 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to authenticate to the slurm API with. @var{slurm-api-endpoint} and @var{slurm-jwt} are only used when @var{batch-system} is @code{'slurm-api}." - (let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store))) - ;; Ensure required inputs are specified. - (vector-for-each (lambda (input) - (let ((input-id (assoc-ref input "id"))) - (unless (or (optional-input? input) - (assoc input-id inputs)) - (user-error "Required input `~a' not specified" - input-id)))) - (assoc-ref cwl "inputs")) - (let ((scheduler (workflow-scheduler - manifest scratch store batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt))) - (let loop ((state ((scheduler-schedule scheduler) - (scheduler-proc name cwl %nothing %nothing) - inputs - scheduler))) - ;; Poll. - (let ((status state ((scheduler-poll scheduler) state))) - (if (eq? status 'pending) - (begin - ;; Pause before looping and polling again so we don't bother the - ;; job server too often. - (sleep (case batch-system - ;; Single machine jobs are run synchronously. So, there - ;; is no need to wait to poll them. - ((single-machine) 0) - ((slurm-api) %job-poll-interval))) - (loop state)) - ;; Capture outputs. - ((scheduler-capture-output scheduler) state))))))) + (let ((scheduler (workflow-scheduler + manifest scratch store batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (let loop ((state ((scheduler-schedule scheduler) + (scheduler-proc name cwl %nothing %nothing) + inputs + scheduler))) + ;; Poll. + (let ((status state ((scheduler-poll scheduler) state))) + (if (eq? status 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the + ;; job server too often. + (sleep (case batch-system + ;; Single machine jobs are run synchronously. So, there + ;; is no need to wait to poll them. + ((single-machine) 0) + ((slurm-api) %job-poll-interval))) + (loop state)) + ;; Capture outputs. + ((scheduler-capture-output scheduler) state)))))) -- cgit v1.2.3