From f331839f339c4366695c43f8def047e223099e7d Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Mon, 30 Sep 2024 02:54:41 +0100 Subject: workflow: Support subworkflows recursively. Implementing subworkflows recursively as sub-propnets is the correct and elegant ouroboros way. Earlier, we were flattening workflows into their constituent CommandLineTools steps. Among many other issues, this approach ran into problems when scattering over subworkflows. * ravanan/workflow.scm: Import assertion-violation from (rnrs base). (, ): New record types. (workflow-class->propnet): New function. (workflow-class->propagators, workflow->command-line-tool-steps): Delete functions. (workflow-scheduler): Represent state using and . Recursively evaluate compound propagators as propnets. (run-workflow): Use capture-output and poll from scheduler. --- ravanan/workflow.scm | 437 +++++++++++++++++++++++++++------------------------ 1 file changed, 232 insertions(+), 205 deletions(-) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 938ddf0..90c4d4c 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -17,6 +17,7 @@ ;;; along with ravanan. If not, see . (define-module (ravanan workflow) + #:use-module ((rnrs base) #:select (assertion-violation)) #:use-module ((rnrs conditions) #:select (define-condition-type)) #:use-module (rnrs exceptions) #:use-module (srfi srfi-1) @@ -60,6 +61,20 @@ (scatter scheduler-proc-scatter) (scatter-method scheduler-proc-scatter-method)) +(define-immutable-record-type + (command-line-tool-state job-state formal-outputs) + command-line-tool-state? + (job-state command-line-tool-state-job-state + set-command-line-tool-state-job-state) + (formal-outputs command-line-tool-state-formal-outputs)) + +(define-immutable-record-type + (workflow-state propnet-state formal-outputs) + workflow-state? + (propnet-state workflow-state-propnet-state + set-workflow-state-propnet-state) + (formal-outputs workflow-state-formal-outputs)) + (define (value=? maybe-val1 maybe-val2) "Return @code{#t} if maybe-monadic values @var{maybe-val1} and @var{maybe-val2} are the same value. Else, return @code{#f}." @@ -174,141 +189,117 @@ propagator." (assoc-ref output "id"))) (assoc-ref cwl "outputs")))) -(define (workflow->command-line-tool-steps cwl) - "Recursively traverse @var{cwl} tree and return a list of -@code{CommandLineTool} class steps." - (define (inherit-defaults step-run step-in) - "Augment the @var{step-run} CWL tree with defaults from the containing -workflow. @var{step-in} is the inputs mapping association list of the step." - (assoc-set step-run - (cons "inputs" - (vector-map (lambda (input) - (let* ((input-id (assoc-ref input "id")) - (input-source (assoc-ref step-in input-id))) - (maybe-assoc-set input - (cons "default" - (maybe-assoc-ref (just cwl) - "inputs" - input-source - "default"))))) - (assoc-ref* step-run "inputs"))))) - - (vector-append-map->list (lambda (step) - (let ((run (inherit-defaults (assoc-ref step "run") - (assoc-ref step "in")))) - (if (string=? (assoc-ref run "class") - "CommandLineTool") - (list (assoc-set step - (cons "run" run))) - (workflow->command-line-tool-steps +(define* (workflow-class->propnet name cwl scheduler) + "Return a propagator network scheduled using @var{scheduler} for @var{cwl}, a +@code{Workflow} class workflow with @var{name}." + (define (normalize-scatter-method scatter-method) + (assoc-ref* '(("dotproduct" . dot-product) + ("nested_crossproduct" . nested-cross-product) + ("flat_crossproduct" . flat-cross-product)) + scatter-method)) + + (define (step->propagator step) + (let* ((step-id (assoc-ref* step "id")) + (step-propagator + (command-line-tool->propagator step-id (assoc-ref* step "run")))) + (propagator (propagator-name step-propagator) + (let ((proc (propagator-proc step-propagator))) + (scheduler-proc (scheduler-proc-name proc) (inherit-requirements-and-hints - run + (scheduler-proc-cwl proc) (or (assoc-ref cwl "requirements") #()) (or (assoc-ref cwl "hints") #()) - (or (assoc-ref run "requirements") + (or (assoc-ref step "requirements") #()) - (or (assoc-ref run "hints") - #())))))) - (assoc-ref cwl "steps"))) - -(define* (workflow-class->propagators name cwl) - "Return a list of propagators for @var{cwl}, a @code{Workflow} class -workflow with @var{name}." - (define (prefix-name prefix name) - "Prefix @var{name} with @var{prefix} so as to keep subworkflows in -their own namespaces." - (string-append prefix "/" name)) - - (map (lambda (step) - (let* ((step-id (assoc-ref step "id")) - (step-propagator - (command-line-tool->propagator step-id - (assoc-ref step "run")))) - (propagator (prefix-name step-id - (propagator-name step-propagator)) - ;; Augment proc with scatter and scatter method. - (let ((proc (propagator-proc step-propagator))) - (scheduler-proc - (scheduler-proc-name proc) - (scheduler-proc-cwl proc) - (maybe-assoc-ref (just step) "scatter") - (maybe-let* ((scatter-method - (maybe-assoc-ref (just step) "scatterMethod"))) - (just (assoc-ref* '(("dotproduct" . dot-product) - ("nested_crossproduct" . nested-cross-product) - ("flat_crossproduct" . flat-cross-product)) - scatter-method))))) - (assoc-ref step "in") - (propagator-optional-inputs step-propagator) - (vector-map->list (lambda (output-name) - (cons output-name - (prefix-name step-id output-name))) - (assoc-ref step "out"))))) - (workflow->command-line-tool-steps cwl))) - -(define (workflow->propagators name cwl) - "Return a list of propagators for @var{cwl}, a workflow with -@var{name}." - (let ((class (assoc-ref cwl "class"))) - ((cond - ((string=? class "CommandLineTool") - (compose list command-line-tool->propagator)) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" class)) - ((string=? class "Workflow") - (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements"))) - (check-requirements requirements %supported-requirements)) - (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints"))) - (check-requirements hints %supported-requirements #t)) - workflow-class->propagators) - (else - (error "Invalid workflow class" class))) - name cwl))) + (or (assoc-ref step "hints") + #())) + (maybe-assoc-ref (just step) "scatter") + (maybe-bind (maybe-assoc-ref (just step) "scatterMethod") + (compose just normalize-scatter-method)))) + (map (match-lambda + ((input-id . _) + (cons input-id + (json-ref step "in" input-id)))) + (propagator-inputs step-propagator)) + (propagator-optional-inputs step-propagator) + (filter-map (match-lambda + ((output . cell) + (and (vector-member output + (assoc-ref* step "out")) + (cons output + (string-append step-id "/" cell))))) + (propagator-outputs step-propagator))))) + + (maybe-let* ((requirements (maybe-assoc-ref (just cwl) "requirements"))) + (check-requirements requirements + %supported-requirements)) + (maybe-let* ((hints (maybe-assoc-ref (just cwl) "hints"))) + (check-requirements hints + %supported-requirements + #t)) + (propnet (vector-map->list step->propagator + (assoc-ref* cwl "steps")) + value=? + merge-values + scheduler)) (define* (workflow-scheduler manifest scratch store batch-system #:key guix-daemon-socket slurm-api-endpoint slurm-jwt) - (define (schedule proc inputs) + (define (schedule proc inputs scheduler) "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a -job state object." - (let ((name (scheduler-proc-name proc)) - (cwl (scheduler-proc-cwl proc)) - (scatter (from-maybe (scheduler-proc-scatter proc) - #f)) - (scatter-method (from-maybe (scheduler-proc-scatter-method proc) - #f))) - (if scatter - (case scatter-method - ((dot-product) - (apply vector-map - (lambda input-elements - ;; Recurse with scattered inputs spliced in. - (schedule (scheduler-proc name cwl %nothing %nothing) - ;; Replace scattered inputs with single - ;; elements. - (apply assoc-set - inputs - (map cons - (vector->list scatter) - input-elements)))) - ;; Extract values of scattered inputs. - (vector-map->list (cut assoc-ref inputs <>) - scatter))) - ((nested-cross-product flat-cross-product) - (error scatter-method - "Scatter method not implemented yet"))) - (run-command-line-tool name - manifest - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)))) +job state object. @var{proc} may either be a @code{} object or a +@code{} object." + (let* ((name (scheduler-proc-name proc)) + (cwl (scheduler-proc-cwl proc)) + (scatter (from-maybe (scheduler-proc-scatter proc) + #f)) + (scatter-method (from-maybe (scheduler-proc-scatter-method proc) + #f)) + (class (assoc-ref* cwl "class"))) + (cond + (scatter + (case scatter-method + ((dot-product) + (apply vector-map + (lambda input-elements + ;; Recurse with scattered inputs spliced in. + (schedule (scheduler-proc name cwl %nothing %nothing) + ;; Replace scattered inputs with single + ;; elements. + (apply assoc-set + inputs + (map cons + (vector->list scatter) + input-elements)) + scheduler)) + ;; Extract values of scattered inputs. + (vector-map->list (cut assoc-ref inputs <>) + scatter))) + ((nested-cross-product flat-cross-product) + (error scatter-method + "Scatter method not implemented yet")))) + ((string=? class "CommandLineTool") + (command-line-tool-state + (run-command-line-tool name + manifest + cwl + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt) + (assoc-ref* cwl "outputs"))) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" class)) + ((string=? class "Workflow") + (workflow-state (schedule-propnet (workflow-class->propnet name cwl scheduler) + inputs) + (assoc-ref* cwl "outputs")))))) (define (poll state) "Return current status and updated state of job @var{state} object. The status is @@ -321,40 +312,103 @@ exit if job has failed." script (script->store-stdout-file script store) (script->store-stderr-file script store))))) - (let ((status state (job-state-status state - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt))) - (values (case status - ((failed) - (raise-exception (job-failure (job-state-script state)))) - (else => identity)) - state)))) + (cond + ;; Return vector states as completed only if all state elements in it are + ;; completed. + ((vector? state) + (let ((status state (vector-mapn poll state))) + (values (if (vector-every (cut eq? <> 'completed) + status) + 'completed + 'pending) + state))) + ;; Poll job state. Raise an exception if the job has failed. + ((command-line-tool-state? state) + (let ((status updated-job-state + (job-state-status (command-line-tool-state-job-state state) + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (values (case status + ((failed) + (raise-exception (job-failure + (job-state-script + (command-line-tool-state-job-state state))))) + (else => identity)) + (set-command-line-tool-state-job-state state updated-job-state)))) + ;; Poll sub-workflow state. We do not need to check the status here since + ;; job failures only occur at the level of a CommandLineTool. + ((workflow-state? state) + (let ((status updated-propnet-state + (poll-propnet (workflow-state-propnet-state state)))) + (values status + (set-workflow-state-propnet-state state updated-propnet-state)))) + (else + (assertion-violation state "Invalid state"))))) + + (define (find-output class outputs formal-output) + "Find @var{formal-output} among @var{outputs}. @var{class} is the class of the +workflow." + (let ((output-id (assoc-ref formal-output "id"))) + (cond + ;; The output is present; cons and return it. + ((cond + ((string=? class "CommandLineTool") + (assoc-ref outputs output-id)) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" + class)) + ((string=? class "Workflow") + (assoc-ref outputs + (assoc-ref* formal-output "outputSource")))) + => (cut cons output-id <>)) + ;; The output is absent; check if a null type is acceptable. + ((match-type 'null + (formal-parameter-type (assoc-ref* formal-output "type"))) + #f) + ;; Else, error out. + (else + (error "output not found" output-id))))) + + (define (filter-outputs class outputs formal-outputs) + "Filter @var{outputs} to only have outputs from @var{formal-outputs}. @var{class} +is the class of the workflow." + (vector-filter-map->list (cut find-output class outputs <>) + formal-outputs)) (define (capture-output state) "Return output of completed job @var{state}." - (if (vector? state) - ;; Combine outputs from individual state elements. - (match (vector-map capture-output state) - ((and #(head-output _ ...) - outputs) - (map (match-lambda - ((id . value) - (cons id - (vector-map (lambda (output) - ;; FIXME: Is this the correct way to - ;; handle missing outputs? - (or (assoc-ref output id) - 'null)) - outputs)))) - head-output))) - ;; Log progress and return captured output. - (let ((script (job-state-script state))) - (format (current-error-port) - "~a completed; logs at ~a and ~a~%" - script - (script->store-stdout-file script store) - (script->store-stderr-file script store)) - (capture-command-line-tool-output script store)))) + (cond + ((workflow-state? state) + (filter-outputs "Workflow" + (capture-propnet-output + (workflow-state-propnet-state state)) + (workflow-state-formal-outputs state))) + ((vector? state) + ;; Combine outputs from individual state elements. + (match (vector-map capture-output state) + ((and #(head-output _ ...) + outputs) + (map (match-lambda + ((id . value) + (cons id + (vector-map (lambda (output) + ;; FIXME: Is this the correct way to + ;; handle missing outputs? + (or (assoc-ref output id) + 'null)) + outputs)))) + head-output)))) + (else + ;; Log progress and return captured output. + (let ((script (job-state-script (command-line-tool-state-job-state state)))) + (format (current-error-port) + "~a completed; logs at ~a and ~a~%" + script + (script->store-stdout-file script store) + (script->store-stderr-file script store)) + (filter-outputs "CommandLineTool" + (capture-command-line-tool-output script store) + (command-line-tool-state-formal-outputs state)))))) (scheduler schedule poll capture-output)) @@ -507,29 +561,6 @@ endpoint to connect to. @var{slurm-jwt}, a string, is the JWT token to authenticate to the slurm API with. @var{slurm-api-endpoint} and @var{slurm-jwt} are only used when @var{batch-system} is @code{'slurm-api}." - (define (capture-output cell-values output) - (let ((output-id (assoc-ref output "id"))) - (cond - ;; The output is present; cons and return it. - ((let ((class (assoc-ref* cwl "class"))) - (cond - ((string=? class "CommandLineTool") - (assoc-ref cell-values output-id)) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" - class)) - ((string=? class "Workflow") - (assoc-ref cell-values - (assoc-ref* output "outputSource"))))) - => (cut cons output-id <>)) - ;; The output is absent; check if a null type is acceptable. - ((match-type 'null - (formal-parameter-type (assoc-ref* output "type"))) - #f) - ;; Else, error out. - (else - (error "output not found" output-id))))) - (let ((inputs (resolve-inputs inputs (assoc-ref* cwl "inputs") store))) ;; Ensure required inputs are specified. (vector-for-each (lambda (input) @@ -539,30 +570,26 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and (user-error "Required input `~a' not specified" input-id)))) (assoc-ref cwl "inputs")) - (let loop ((state (schedule-propnet - (propnet (workflow->propagators name cwl) - value=? - merge-values - (workflow-scheduler - manifest scratch store batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)) - inputs))) - ;; Poll. - (let ((status state (poll-propnet state))) - (if (eq? status 'pending) - (begin - ;; Pause before looping and polling again so we don't bother the job - ;; server too often. - (sleep (case batch-system - ;; Single machine jobs are run synchronously. So, there is - ;; no need to wait to poll them. - ((single-machine) 0) - ((slurm-api) %job-poll-interval))) - (loop state)) - ;; Capture outputs. - (vector-filter-map->list (cute capture-output - (capture-propnet-output state) - <>) - (assoc-ref* cwl "outputs"))))))) + (let ((scheduler (workflow-scheduler + manifest scratch store batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (let loop ((state ((scheduler-schedule scheduler) + (scheduler-proc name cwl %nothing %nothing) + inputs + scheduler))) + ;; Poll. + (let ((status state ((scheduler-poll scheduler) state))) + (if (eq? status 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the + ;; job server too often. + (sleep (case batch-system + ;; Single machine jobs are run synchronously. So, there + ;; is no need to wait to poll them. + ((single-machine) 0) + ((slurm-api) %job-poll-interval))) + (loop state)) + ;; Capture outputs. + ((scheduler-capture-output scheduler) state))))))) -- cgit v1.2.3