diff options
author | Arun Isaac | 2025-06-24 23:18:00 +0100 |
---|---|---|
committer | Arun Isaac | 2025-06-26 14:50:28 +0100 |
commit | 573f5a13dbf07e4dab0e06a5297875f5494545da (patch) | |
tree | 41f74191e4b0f72782b903631b0a25c3e77c31b7 | |
parent | bbf25f580d94a2ec467fb9ce011586bca6f8267d (diff) | |
download | ravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.tar.gz ravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.tar.lz ravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.zip |
workflow: Build out propagator network eagerly.
Build out propagator network eagerly descending into the lowest
subworkflows. This will come in handy later on to build the G-exp
scripts ahead of time before the workflow is run.
* ravanan/workflow.scm (<scheduler-proc>)[cwl]: Rename to
cwl-or-propnet.
[formal-inputs, formal-outputs]: New fields.
* ravanan/workflow.scm (workflow->scheduler-proc): New function.
(workflow-class->propnet): Build out propagator network eagerly.
(workflow-scheduler)[schedule]: Handle eagerly built-out propagator
network <schedule-proc> objects.
(run-workflow): Use workflow->scheduler-proc to convert workflow to
<scheduler-proc> object.
-rw-r--r-- | ravanan/workflow.scm | 144 |
1 files changed, 89 insertions, 55 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 25a42c1..1d3578a 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -63,10 +63,12 @@ (inputs job-failure-inputs)) (define-immutable-record-type <scheduler-proc> - (scheduler-proc name cwl scatter scatter-method) + (scheduler-proc name cwl-or-propnet formal-inputs formal-outputs scatter scatter-method) scheduler-proc? (name scheduler-proc-name) - (cwl scheduler-proc-cwl) + (cwl-or-propnet scheduler-proc-cwl-or-propnet) + (formal-inputs scheduler-proc-formal-inputs) + (formal-outputs scheduler-proc-formal-outputs) (scatter scheduler-proc-scatter) (scatter-method scheduler-proc-scatter-method)) @@ -168,34 +170,62 @@ requirements and hints of the step." (assoc-ref* input "type")))) (assoc-ref input "id"))) -(define* (workflow-class->propnet name cwl scheduler batch-system) +(define (workflow->scheduler-proc name cwl scheduler batch-system + scatter scatter-method) + "Return a @code{<scheduler-proc>} object for @var{cwl} workflow named @var{name} +scheduled using @var{scheduler} on @var{batch-system}. @var{scatter} and +@var{scatter-method} are the CWL scattering properties of this step." + (scheduler-proc name + (let ((class (assoc-ref* cwl "class"))) + (cond + ((string=? class "CommandLineTool") + cwl) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" class)) + ((string=? class "Workflow") + (workflow-class->propnet cwl scheduler batch-system)) + (else + (assertion-violation class "Unexpected workflow class")))) + (assoc-ref* cwl "inputs") + (assoc-ref* cwl "outputs") + scatter + scatter-method)) + +(define* (workflow-class->propnet cwl scheduler batch-system) "Return a propagator network scheduled using @var{scheduler} on -@var{batch-system} for @var{cwl}, a @code{Workflow} class workflow with -@var{name}." +@var{batch-system} for @var{cwl}, a @code{Workflow} class workflow." (define (normalize-scatter-method scatter-method) (assoc-ref* '(("dotproduct" . dot-product) ("nested_crossproduct" . nested-cross-product) ("flat_crossproduct" . flat-cross-product)) scatter-method)) + (define (step->scheduler-proc step parent-requirements parent-hints) + (let ((run (assoc-ref* step "run"))) + (workflow->scheduler-proc (assoc-ref* step "id") + (inherit-requirements-and-hints + run + parent-requirements + parent-hints + (or (assoc-ref step "requirements") + #()) + (or (assoc-ref step "hints") + #())) + scheduler + batch-system + (maybe-assoc-ref (just step) "scatter") + (maybe-bind (maybe-assoc-ref (just step) "scatterMethod") + (compose just normalize-scatter-method))))) + (define (step->propagator step) (let ((step-id (assoc-ref* step "id")) (run (assoc-ref* step "run"))) (propagator step-id - (scheduler-proc step-id - (inherit-requirements-and-hints - run - (or (assoc-ref cwl "requirements") - #()) - (or (assoc-ref cwl "hints") - #()) - (or (assoc-ref step "requirements") - #()) - (or (assoc-ref step "hints") - #())) - (maybe-assoc-ref (just step) "scatter") - (maybe-bind (maybe-assoc-ref (just step) "scatterMethod") - (compose just normalize-scatter-method))) + (step->scheduler-proc step + (or (assoc-ref cwl "requirements") + #()) + (or (assoc-ref cwl "hints") + #())) (vector-map->list (lambda (input) (let ((input-id (assoc-ref input "id"))) (cons input-id @@ -234,10 +264,10 @@ requirements and hints of the step." #:key guix-daemon-socket) (define (schedule proc inputs scheduler) "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a -state-monadic job state object. @var{proc} may either be a @code{<propnet>} -object or a @code{<scheduler-proc>} object." +state-monadic job state object. @var{proc} must be a @code{<scheduler-proc>} +object." (let* ((name (scheduler-proc-name proc)) - (cwl (scheduler-proc-cwl proc)) + (cwl-or-propnet (scheduler-proc-cwl-or-propnet proc)) (scatter (from-maybe (scheduler-proc-scatter proc) #f)) (scatter-method (from-maybe (scheduler-proc-scatter-method proc) @@ -248,7 +278,12 @@ object or a @code{<scheduler-proc>} object." (apply state-map (lambda input-elements ;; Recurse with scattered inputs spliced in. - (schedule (scheduler-proc name cwl %nothing %nothing) + (schedule (scheduler-proc name + cwl-or-propnet + (scheduler-proc-formal-inputs proc) + (scheduler-proc-formal-outputs proc) + %nothing + %nothing) ;; Replace scattered inputs with single ;; elements. (apply assoc-set @@ -264,39 +299,36 @@ object or a @code{<scheduler-proc>} object." ((nested-cross-product flat-cross-product) (error scatter-method "Scatter method not implemented yet"))) - (let* ((class (assoc-ref* cwl "class")) - (formal-inputs (assoc-ref* cwl "inputs")) - ;; We need to resolve inputs after adding defaults since the - ;; default values may contain uninterned File objects. - (inputs (resolve-inputs (add-defaults inputs formal-inputs) - formal-inputs - store))) - (cond - ((string=? class "CommandLineTool") - (state-let* ((job-state - (run-command-line-tool name - manifest-file - channels - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket))) - (state-return (command-line-tool-state job-state - (assoc-ref* cwl "outputs"))))) - ((string=? class "ExpressionTool") - (error "Workflow class not implemented yet" class)) - ((string=? class "Workflow") - (state-let* ((propnet-state - (schedule-propnet (workflow-class->propnet name - cwl - scheduler - batch-system) - inputs))) + (if (propnet? cwl-or-propnet) + (state-let* ((propnet-state (schedule-propnet cwl-or-propnet inputs))) (state-return (workflow-state propnet-state - (assoc-ref* cwl "outputs")))))))))) + (scheduler-proc-formal-outputs proc)))) + (let* ((class (assoc-ref* cwl-or-propnet "class")) + (formal-inputs (scheduler-proc-formal-inputs proc)) + ;; We need to resolve inputs after adding defaults since + ;; the default values may contain uninterned File objects. + (inputs (resolve-inputs (add-defaults inputs formal-inputs) + formal-inputs + store))) + (cond + ((string=? class "CommandLineTool") + (state-let* ((job-state + (run-command-line-tool name + manifest-file + channels + cwl-or-propnet + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket))) + (state-return (command-line-tool-state job-state + (scheduler-proc-formal-outputs proc))))) + ((string=? class "ExpressionTool") + (error "Workflow class not implemented yet" class)) + (else + (assertion-violation class "Unexpected workflow class")))))))) (define (poll state) "Return updated state and current status of job @var{state} object as a @@ -535,7 +567,9 @@ area need not be shared. @var{store} is the path to the shared ravanan store. #:guix-daemon-socket guix-daemon-socket))) (run-with-state (let loop ((mstate ((scheduler-schedule scheduler) - (scheduler-proc name cwl %nothing %nothing) + (workflow->scheduler-proc name cwl + scheduler batch-system + %nothing %nothing) inputs scheduler))) ;; Poll. |