aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Isaac2025-06-24 23:18:00 +0100
committerArun Isaac2025-06-26 14:50:28 +0100
commit573f5a13dbf07e4dab0e06a5297875f5494545da (patch)
tree41f74191e4b0f72782b903631b0a25c3e77c31b7
parentbbf25f580d94a2ec467fb9ce011586bca6f8267d (diff)
downloadravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.tar.gz
ravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.tar.lz
ravanan-573f5a13dbf07e4dab0e06a5297875f5494545da.zip
workflow: Build out propagator network eagerly.
Build out propagator network eagerly descending into the lowest subworkflows. This will come in handy later on to build the G-exp scripts ahead of time before the workflow is run. * ravanan/workflow.scm (<scheduler-proc>)[cwl]: Rename to cwl-or-propnet. [formal-inputs, formal-outputs]: New fields. * ravanan/workflow.scm (workflow->scheduler-proc): New function. (workflow-class->propnet): Build out propagator network eagerly. (workflow-scheduler)[schedule]: Handle eagerly built-out propagator network <schedule-proc> objects. (run-workflow): Use workflow->scheduler-proc to convert workflow to <scheduler-proc> object.
-rw-r--r--ravanan/workflow.scm144
1 files changed, 89 insertions, 55 deletions
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 25a42c1..1d3578a 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -63,10 +63,12 @@
(inputs job-failure-inputs))
(define-immutable-record-type <scheduler-proc>
- (scheduler-proc name cwl scatter scatter-method)
+ (scheduler-proc name cwl-or-propnet formal-inputs formal-outputs scatter scatter-method)
scheduler-proc?
(name scheduler-proc-name)
- (cwl scheduler-proc-cwl)
+ (cwl-or-propnet scheduler-proc-cwl-or-propnet)
+ (formal-inputs scheduler-proc-formal-inputs)
+ (formal-outputs scheduler-proc-formal-outputs)
(scatter scheduler-proc-scatter)
(scatter-method scheduler-proc-scatter-method))
@@ -168,34 +170,62 @@ requirements and hints of the step."
(assoc-ref* input "type"))))
(assoc-ref input "id")))
-(define* (workflow-class->propnet name cwl scheduler batch-system)
+(define (workflow->scheduler-proc name cwl scheduler batch-system
+ scatter scatter-method)
+ "Return a @code{<scheduler-proc>} object for @var{cwl} workflow named @var{name}
+scheduled using @var{scheduler} on @var{batch-system}. @var{scatter} and
+@var{scatter-method} are the CWL scattering properties of this step."
+ (scheduler-proc name
+ (let ((class (assoc-ref* cwl "class")))
+ (cond
+ ((string=? class "CommandLineTool")
+ cwl)
+ ((string=? class "ExpressionTool")
+ (error "Workflow class not implemented yet" class))
+ ((string=? class "Workflow")
+ (workflow-class->propnet cwl scheduler batch-system))
+ (else
+ (assertion-violation class "Unexpected workflow class"))))
+ (assoc-ref* cwl "inputs")
+ (assoc-ref* cwl "outputs")
+ scatter
+ scatter-method))
+
+(define* (workflow-class->propnet cwl scheduler batch-system)
"Return a propagator network scheduled using @var{scheduler} on
-@var{batch-system} for @var{cwl}, a @code{Workflow} class workflow with
-@var{name}."
+@var{batch-system} for @var{cwl}, a @code{Workflow} class workflow."
(define (normalize-scatter-method scatter-method)
(assoc-ref* '(("dotproduct" . dot-product)
("nested_crossproduct" . nested-cross-product)
("flat_crossproduct" . flat-cross-product))
scatter-method))
+ (define (step->scheduler-proc step parent-requirements parent-hints)
+ (let ((run (assoc-ref* step "run")))
+ (workflow->scheduler-proc (assoc-ref* step "id")
+ (inherit-requirements-and-hints
+ run
+ parent-requirements
+ parent-hints
+ (or (assoc-ref step "requirements")
+ #())
+ (or (assoc-ref step "hints")
+ #()))
+ scheduler
+ batch-system
+ (maybe-assoc-ref (just step) "scatter")
+ (maybe-bind (maybe-assoc-ref (just step) "scatterMethod")
+ (compose just normalize-scatter-method)))))
+
(define (step->propagator step)
(let ((step-id (assoc-ref* step "id"))
(run (assoc-ref* step "run")))
(propagator step-id
- (scheduler-proc step-id
- (inherit-requirements-and-hints
- run
- (or (assoc-ref cwl "requirements")
- #())
- (or (assoc-ref cwl "hints")
- #())
- (or (assoc-ref step "requirements")
- #())
- (or (assoc-ref step "hints")
- #()))
- (maybe-assoc-ref (just step) "scatter")
- (maybe-bind (maybe-assoc-ref (just step) "scatterMethod")
- (compose just normalize-scatter-method)))
+ (step->scheduler-proc step
+ (or (assoc-ref cwl "requirements")
+ #())
+ (or (assoc-ref cwl "hints")
+ #()))
(vector-map->list (lambda (input)
(let ((input-id (assoc-ref input "id")))
(cons input-id
@@ -234,10 +264,10 @@ requirements and hints of the step."
#:key guix-daemon-socket)
(define (schedule proc inputs scheduler)
"Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-state-monadic job state object. @var{proc} may either be a @code{<propnet>}
-object or a @code{<scheduler-proc>} object."
+state-monadic job state object. @var{proc} must be a @code{<scheduler-proc>}
+object."
(let* ((name (scheduler-proc-name proc))
- (cwl (scheduler-proc-cwl proc))
+ (cwl-or-propnet (scheduler-proc-cwl-or-propnet proc))
(scatter (from-maybe (scheduler-proc-scatter proc)
#f))
(scatter-method (from-maybe (scheduler-proc-scatter-method proc)
@@ -248,7 +278,12 @@ object or a @code{<scheduler-proc>} object."
(apply state-map
(lambda input-elements
;; Recurse with scattered inputs spliced in.
- (schedule (scheduler-proc name cwl %nothing %nothing)
+ (schedule (scheduler-proc name
+ cwl-or-propnet
+ (scheduler-proc-formal-inputs proc)
+ (scheduler-proc-formal-outputs proc)
+ %nothing
+ %nothing)
;; Replace scattered inputs with single
;; elements.
(apply assoc-set
@@ -264,39 +299,36 @@ object or a @code{<scheduler-proc>} object."
((nested-cross-product flat-cross-product)
(error scatter-method
"Scatter method not implemented yet")))
- (let* ((class (assoc-ref* cwl "class"))
- (formal-inputs (assoc-ref* cwl "inputs"))
- ;; We need to resolve inputs after adding defaults since the
- ;; default values may contain uninterned File objects.
- (inputs (resolve-inputs (add-defaults inputs formal-inputs)
- formal-inputs
- store)))
- (cond
- ((string=? class "CommandLineTool")
- (state-let* ((job-state
- (run-command-line-tool name
- manifest-file
- channels
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket)))
- (state-return (command-line-tool-state job-state
- (assoc-ref* cwl "outputs")))))
- ((string=? class "ExpressionTool")
- (error "Workflow class not implemented yet" class))
- ((string=? class "Workflow")
- (state-let* ((propnet-state
- (schedule-propnet (workflow-class->propnet name
- cwl
- scheduler
- batch-system)
- inputs)))
+ (if (propnet? cwl-or-propnet)
+ (state-let* ((propnet-state (schedule-propnet cwl-or-propnet inputs)))
(state-return
(workflow-state propnet-state
- (assoc-ref* cwl "outputs"))))))))))
+ (scheduler-proc-formal-outputs proc))))
+ (let* ((class (assoc-ref* cwl-or-propnet "class"))
+ (formal-inputs (scheduler-proc-formal-inputs proc))
+ ;; We need to resolve inputs after adding defaults since
+ ;; the default values may contain uninterned File objects.
+ (inputs (resolve-inputs (add-defaults inputs formal-inputs)
+ formal-inputs
+ store)))
+ (cond
+ ((string=? class "CommandLineTool")
+ (state-let* ((job-state
+ (run-command-line-tool name
+ manifest-file
+ channels
+ cwl-or-propnet
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket)))
+ (state-return (command-line-tool-state job-state
+ (scheduler-proc-formal-outputs proc)))))
+ ((string=? class "ExpressionTool")
+ (error "Workflow class not implemented yet" class))
+ (else
+ (assertion-violation class "Unexpected workflow class"))))))))
(define (poll state)
"Return updated state and current status of job @var{state} object as a
@@ -535,7 +567,9 @@ area need not be shared. @var{store} is the path to the shared ravanan store.
#:guix-daemon-socket guix-daemon-socket)))
(run-with-state
(let loop ((mstate ((scheduler-schedule scheduler)
- (scheduler-proc name cwl %nothing %nothing)
+ (workflow->scheduler-proc name cwl
+ scheduler batch-system
+ %nothing %nothing)
inputs
scheduler)))
;; Poll.