diff options
-rw-r--r-- | ravanan/workflow.scm | 437 |
1 files changed, 232 insertions, 205 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 938ddf0..90c4d4c 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -17,6 +17,7 @@ ;;; along with ravanan. If not, see <https://www.gnu.org/licenses/>. (define-module (ravanan workflow) + #:use-module ((rnrs base) #:select (assertion-violation)) #:use-module ((rnrs conditions) #:select (define-condition-type)) #:use-module (rnrs exceptions) #:use-module (srfi srfi-1) @@ -60,6 +61,20 @@ (scatter scheduler-proc-scatter) (scatter-method scheduler-proc-scatter-method)) +(define-immutable-record-type <command-line-tool-state> + (command-line-tool-state job-state formal-outputs) + command-line-tool-state? + (job-state command-line-tool-state-job-state + set-command-line-tool-state-job-state) + (formal-outputs command-line-tool-state-formal-outputs)) + +(define-immutable-record-type <workflow-state> + (workflow-state propnet-state formal-outputs) + workflow-state? + (propnet-state workflow-state-propnet-state + set-workflow-state-propnet-state) + (formal-outputs workflow-state-formal-outputs)) + (define (value=? maybe-val1 maybe-val2) "Return @code{#t} if maybe-monadic values @var{maybe-val1} and @var{maybe-val2} are the same value. Else, return @code{#f}." @@ -174,141 +189,117 @@ propagator." (assoc-ref output "id"))) (assoc-ref cwl "outputs")))) -(define (workflow->command-line-tool-steps cwl) - "Recursively traverse @var{cwl} tree and return a list of -@code{CommandLineTool} class steps." - (define (inherit-defaults step-run step-in) - "Augment the @var{step-run} CWL tree with defaults from the containing -workflow. @var{step-in} is the inputs mapping association list of the step." - (assoc-set step-run - (cons "inputs" - (vector-map (lambda (input) - (let* ((input-id (assoc-ref input "id")) - (input-source (assoc-ref step-in input-id))) - (maybe-assoc-set input - (cons "default" - (maybe-assoc-ref (just cwl) - "inputs" - input-source - "default"))))) - (assoc-ref* step-run "inputs"))))) - - (vector-append-map->list (lambda (step) - (let ((run (inherit-defaults (assoc-ref step "run") - (assoc-ref step "in")))) - (if (string=? (assoc-ref run "class") - "CommandLineTool") - (list (assoc-set step - (cons "run" run))) - (workflow->command-line-tool-steps +(define* (workflow-class->propnet name cwl scheduler) + "Return a propagator network scheduled using @var{scheduler} for @var{cwl}, a +@code{Workflow} class workflow with @var{name}." + (define (normalize-scatter-method scatter-method) + (assoc-ref* '(("dotproduct" . dot-product) + ("nested_crossproduct" . nested-cross-product) + ("flat_crossproduct" . flat-cross-product)) + scatter-method)) + + (define (step->propagator step) + (let* ((step-id (assoc-ref* step "id")) + (step-propagator + (command-line-tool->propagator step-id (assoc-ref* step "run")))) + (propagator (propagator-name step-propagator) + (let ((proc (propagator-proc step-propagator))) + (scheduler-proc (scheduler-proc-name proc) (inherit-requirements-and-hints - run + (scheduler-proc-cwl proc) (or (assoc-ref cwl "requirements") #()) (or (assoc-ref cwl "hints") #()) - (or (assoc-ref run "requirements") + (or (assoc-ref step "requirements") #()) - (or (assoc-ref run "hints") - #())))))) - (assoc-ref cwl "steps"))) - -(define* (workflow-class->propagators name cwl) - "Return a list of propagators for @var{cwl}, a @code{Workflow} class -workflow with @var{name}." - (define (prefix-name prefix name) - "Prefix @var{name} with @var{prefix} so as to keep subworkflows in -their own namespaces." - (string-append prefix "/" name)) - - (map (lambda (step) - (let* ((step-id (assoc-ref step "id")) - (step-propagator - (command-line-tool->propagator step-id - (assoc-ref step "run")))) - (propagator (prefix-name step-id - (propagator-name step-propagator)) - ;; Augment proc with scatter and scatter method. - (let ((proc (propagator-proc step-propagator))) - (scheduler-proc - (scheduler-proc-name proc) - (scheduler-proc-cwl proc) - (maybe-assoc-ref (just step) "scatter") - (maybe-let* ((scatter-method - (maybe-assoc-ref (just step) "scatterMethod"))) - (just (assoc-ref* '(("dotproduct" . dot-product) - ("nested_crossproduct" . nested-cross-product) - ("flat_crossproduct" . flat-cross-product)) - scatter-method))))) - (assoc-ref step "in") - (propagator-optional-inputs step-propagator) - (vector-map->list (lambda (output-name) - (cons output-name - (prefix-name step-id output-name))) - (assoc-ref step "out"))))) - (workflow->command-line-tool-steps cwl))) - -(define (workflow->propagators name cwl) - "Return a list of propagators for @var{cwl}, a workflow with -@var{name}." - (let ((class (assoc-ref cwl "class"))) - ((cond - ((string=? class "CommandLineTool") - (compose list command-line-tool->propagator)) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" class)) - ((string=? class "Workflow") - (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements"))) - (check-requirements requirements %supported-requirements)) - (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints"))) - (check-requirements hints %supported-requirements #t)) - workflow-class->propagators) - (else - (error "Invalid workflow class" class))) - name cwl))) + (or (assoc-ref step "hints") + #())) + (maybe-assoc-ref (just step) "scatter") + (maybe-bind (maybe-assoc-ref (just step) "scatterMethod") + (compose just normalize-scatter-method)))) + (map (match-lambda + ((input-id . _) + (cons input-id + (json-ref step "in" input-id)))) + (propagator-inputs step-propagator)) + (propagator-optional-inputs step-propagator) + (filter-map (match-lambda + ((output . cell) + (and (vector-member output + (assoc-ref* step "out")) + (cons output + (string-append step-id "/" cell))))) + (propagator-outputs step-propagator))))) + + (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements"))) + (check-requirements requirements + %supported-requirements)) + (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints"))) + (check-requirements hints + %supported-requirements + #t)) + (propnet (vector-map->list step->propagator + (assoc-ref* cwl "steps")) + value=? + merge-values + scheduler)) (define* (workflow-scheduler manifest scratch store batch-system #:key guix-daemon-socket slurm-api-endpoint slurm-jwt) - (define (schedule proc inputs) + (define (schedule proc inputs scheduler) "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a -job state object." - (let ((name (scheduler-proc-name proc)) - (cwl (scheduler-proc-cwl proc)) - (scatter (from-maybe (scheduler-proc-scatter proc) - #f)) - (scatter-method (from-maybe (scheduler-proc-scatter-method proc) - #f))) - (if scatter - (case scatter-method - ((dot-product) - (apply vector-map - (lambda input-elements - ;; Recurse with scattered inputs spliced in. - (schedule (scheduler-proc name cwl %nothing %nothing) - ;; Replace scattered inputs with single - ;; elements. - (apply assoc-set - inputs - (map cons - (vector->list scatter) - input-elements)))) - ;; Extract values of scattered inputs. - (vector-map->list (cut assoc-ref inputs <>) - scatter))) - ((nested-cross-product flat-cross-product) - (error scatter-method - "Scatter method not implemented yet"))) - (run-command-line-tool name - manifest - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)))) +job state object. @var{proc} may either be a @code{<propnet>} object or a +@code{<scheduler-proc>} object." + (let* ((name (scheduler-proc-name proc)) + (cwl (scheduler-proc-cwl proc)) + (scatter (from-maybe (scheduler-proc-scatter proc) + #f)) + (scatter-method (from-maybe (scheduler-proc-scatter-method proc) + #f)) + (class (assoc-ref* cwl "class"))) + (cond + (scatter + (case scatter-method + ((dot-product) + (apply vector-map + (lambda input-elements + ;; Recurse with scattered inputs spliced in. + (schedule (scheduler-proc name cwl %nothing %nothing) + ;; Replace scattered inputs with single + ;; elements. + (apply assoc-set + inputs + (map cons + (vector->list scatter) + input-elements)) + scheduler)) + ;; Extract values of scattered inputs. + (vector-map->list (cut assoc-ref inputs <>) + scatter))) + ((nested-cross-product flat-cross-product) + (error scatter-method + "Scatter method not implemented yet")))) + ((string=? class "CommandLineTool") + (command-line-tool-state + (run-command-line-tool name + manifest + cwl + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt) + (assoc-ref* cwl "outputs"))) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" class)) + ((string=? class "Workflow") + (workflow-state (schedule-propnet (workflow-class->propnet name cwl scheduler) + inputs) + (assoc-ref* cwl "outputs")))))) (define (poll state) "Return current status and updated state of job @var{state} object. The status is @@ -321,40 +312,103 @@ exit if job has failed." script (script->store-stdout-file script store) (script->store-stderr-file script store))))) - (let ((status state (job-state-status state - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt))) - (values (case status - ((failed) - (raise-exception (job-failure (job-state-script state)))) - (else => identity)) - state)))) + (cond + ;; Return vector states as completed only if all state elements in it are + ;; completed. + ((vector? state) + (let ((status state (vector-mapn poll state))) + (values (if (vector-every (cut eq? <> 'completed) + status) + 'completed + 'pending) + state))) + ;; Poll job state. Raise an exception if the job has failed. + ((command-line-tool-state? state) + (let ((status updated-job-state + (job-state-status (command-line-tool-state-job-state state) + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (values (case status + ((failed) + (raise-exception (job-failure + (job-state-script + (command-line-tool-state-job-state state))))) + (else => identity)) + (set-command-line-tool-state-job-state state updated-job-state)))) + ;; Poll sub-workflow state. We do not need to check the status here since + ;; job failures only occur at the level of a CommandLineTool. + ((workflow-state? state) + (let ((status updated-propnet-state + (poll-propnet (workflow-state-propnet-state state)))) + (values status + (set-workflow-state-propnet-state state updated-propnet-state)))) + (else + (assertion-violation state "Invalid state"))))) + + (define (find-output class outputs formal-output) + "Find @var{formal-output} among @var{outputs}. @var{class} is the class of the +workflow." + (let ((output-id (assoc-ref formal-output "id"))) + (cond + ;; The output is present; cons and return it. + ((cond + ((string=? class "CommandLineTool") + (assoc-ref outputs output-id)) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" + class)) + ((string=? class "Workflow") + (assoc-ref outputs + (assoc-ref* formal-output "outputSource")))) + => (cut cons output-id <>)) + ;; The output is absent; check if a null type is acceptable. + ((match-type 'null + (formal-parameter-type (assoc-ref* formal-output "type"))) + #f) + ;; Else, error out. + (else + (error "output not found" output-id))))) + + (define (filter-outputs class outputs formal-outputs) + "Filter @var{outputs} to only have outputs from @var{formal-outputs}. @var{class} +is the class of the workflow." + (vector-filter-map->list (cut find-output class outputs <>) + formal-outputs)) (define (capture-output state) "Return output of completed job @var{state}." - (if (vector? state) - ;; Combine outputs from individual state elements. - (match (vector-map capture-output state) - ((and #(head-output _ ...) - outputs) - (map (match-lambda - ((id . value) - (cons id - (vector-map (lambda (output) - ;; FIXME: Is this the correct way to - ;; handle missing outputs? - (or (assoc-ref output id) - 'null)) - outputs)))) - head-output))) - ;; Log progress and return captured output. - (let ((script (job-state-script state))) - (format (current-error-port) - "~a completed; logs at ~a and ~a~%" - script - (script->store-stdout-file script store) - (script->store-stderr-file script store)) - (capture-command-line-tool-output script store)))) + (cond + ((workflow-state? state) + (filter-outputs "Workflow" + (capture-propnet-output + (workflow-state-propnet-state state)) + (workflow-state-formal-outputs state))) + ((vector? state) + ;; Combine outputs from individual state elements. + (match (vector-map capture-output state) + ((and #(head-output _ ...) + outputs) + (map (match-lambda + ((id . value) + (cons id + (vector-map (lambda (output) + ;; FIXME: Is this the correct way to + ;; handle missing outputs? + (or (assoc-ref output id) + 'null)) + outputs)))) + head-output)))) + (else + ;; Log progress and return captured output. + (let ((script (job-state-script (command-line-tool-state-job-state state)))) + (format (current-error-port) + "~a completed; logs at ~a and ~a~%" + script + (script->store-stdout-file script store) + (script->store-stderr-file script store)) + (filter-outputs "CommandLineTool" + (capture-command-line-tool-output script store) + (command-line-tool-state-formal-outputs state)))))) (scheduler schedule poll capture-output)) @@ -507,29 +561,6 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to authenticate to the slurm API with. @var{slurm-api-endpoint} and @var{slurm-jwt} are only used when @var{batch-system} is @code{'slurm-api}." - (define (capture-output cell-values output) - (let ((output-id (assoc-ref output "id"))) - (cond - ;; The output is present; cons and return it. - ((let ((class (assoc-ref* cwl "class"))) - (cond - ((string=? class "CommandLineTool") - (assoc-ref cell-values output-id)) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" - class)) - ((string=? class "Workflow") - (assoc-ref cell-values - (assoc-ref* output "outputSource"))))) - => (cut cons output-id <>)) - ;; The output is absent; check if a null type is acceptable. - ((match-type 'null - (formal-parameter-type (assoc-ref* output "type"))) - #f) - ;; Else, error out. - (else - (error "output not found" output-id))))) - (let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store))) ;; Ensure required inputs are specified. (vector-for-each (lambda (input) @@ -539,30 +570,26 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and (user-error "Required input `~a' not specified" input-id)))) (assoc-ref cwl "inputs")) - (let loop ((state (schedule-propnet - (propnet (workflow->propagators name cwl) - value=? - merge-values - (workflow-scheduler - manifest scratch store batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)) - inputs))) - ;; Poll. - (let ((status state (poll-propnet state))) - (if (eq? status 'pending) - (begin - ;; Pause before looping and polling again so we don't bother the job - ;; server too often. - (sleep (case batch-system - ;; Single machine jobs are run synchronously. So, there is - ;; no need to wait to poll them. - ((single-machine) 0) - ((slurm-api) %job-poll-interval))) - (loop state)) - ;; Capture outputs. - (vector-filter-map->list (cute capture-output - (capture-propnet-output state) - <>) - (assoc-ref* cwl "outputs"))))))) + (let ((scheduler (workflow-scheduler + manifest scratch store batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (let loop ((state ((scheduler-schedule scheduler) + (scheduler-proc name cwl %nothing %nothing) + inputs + scheduler))) + ;; Poll. + (let ((status state ((scheduler-poll scheduler) state))) + (if (eq? status 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the + ;; job server too often. + (sleep (case batch-system + ;; Single machine jobs are run synchronously. So, there + ;; is no need to wait to poll them. + ((single-machine) 0) + ((slurm-api) %job-poll-interval))) + (loop state)) + ;; Capture outputs. + ((scheduler-capture-output scheduler) state))))))) |