diff options
author | Arun Isaac | 2025-01-20 01:37:18 +0000 |
---|---|---|
committer | Arun Isaac | 2025-01-21 00:16:17 +0000 |
commit | 631ca04925a7f65caea4abdddf599a1964ce4d35 (patch) | |
tree | 2aec171dd06546e04f1527b800f0b275fcbc19dc | |
parent | e3fb375aead20c1ef6518750cc6dbc2375f57bc9 (diff) | |
download | ravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.tar.gz ravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.tar.lz ravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.zip |
propnet: Make interface state-monadic.
* ravanan/job-state.scm (job-state-status): Return a state-monadic
value.
* ravanan/propnet.scm: Import (srfi srfi-26).
(schedule-propnet, poll-propnet, capture-propnet-output): Return a
state-monadic value.
* ravanan/workflow.scm (workflow-scheduler)[schedule, poll,
capture-output]: Return a state-monadic value.
(run-workflow): Accept state-monadic values from schedule, poll and
capture-output.
-rw-r--r-- | ravanan/job-state.scm | 33 | ||||
-rw-r--r-- | ravanan/propnet.scm | 298 | ||||
-rw-r--r-- | ravanan/workflow.scm | 160 |
3 files changed, 268 insertions, 223 deletions
diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm index aa709f0..a769698 100644 --- a/ravanan/job-state.scm +++ b/ravanan/job-state.scm @@ -57,28 +57,29 @@ state)) (define* (job-state-status state batch-system) - "Return current status and updated state of job with @var{state} on -@var{batch-system}. The status is one of the symbols @code{completed}, -@code{failed} or @code{pending}." + "Return current status of job with @var{state} on @var{batch-system}. The status +is one of the symbols @code{completed}, @code{failed} or @code{pending} +encapsulated in the state monad." (cond ;; Single machine jobs are run synchronously. So, they return success or ;; failure immediately. ((single-machine-job-state? state) - (if (single-machine-job-state-success? state) - 'completed - 'failed)) + (state-return + (if (single-machine-job-state-success? state) + 'completed + 'failed))) ;; Poll slurm for job state. ((slurm-job-state? state) - (run-with-state - (job-state (slurm-job-state-job-id state) - #:api-endpoint (slurm-api-batch-system-endpoint batch-system) - #:jwt (slurm-api-batch-system-jwt batch-system)))) + (job-state (slurm-job-state-job-id state) + #:api-endpoint (slurm-api-batch-system-endpoint batch-system) + #:jwt (slurm-api-batch-system-jwt batch-system))) ;; For list states, poll each state element and return 'completed only if all ;; state elements have completed. ((list? state) - (or (every (lambda (state-element) - (case (job-state-status state-element batch-system) - ((completed) => identity) - (else #f))) - state) - 'pending)))) + (state-return + (or (every (lambda (state-element) + (case (job-state-status state-element batch-system) + ((completed) => identity) + (else #f))) + state) + 'pending))))) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index 360aea4..34624b5 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -20,6 +20,7 @@ #:use-module ((rnrs base) #:select (assertion-violation)) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9 gnu) + #:use-module (srfi srfi-26) #:use-module (srfi srfi-71) #:use-module (ice-9 match) #:use-module (ravanan work monads) @@ -124,21 +125,22 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}." (propnet-propagators propnet))) (define (schedule-propnet propnet initial-cell-values) - "Start @var{propnet} with @var{initial-cell-values}, and return + "Start @var{propnet} with @var{initial-cell-values}, and return a state-monadic @code{<propnet-state>} object." - (propnet-state propnet - (list) - initial-cell-values - (list) - ;; Pre-schedule all propagators to ensure we - ;; trigger those propagators that have no inputs - ;; at all. - (propnet-propagators propnet))) + (state-return + (propnet-state propnet + (state-return (list)) + (state-return initial-cell-values) + (state-return (list)) + ;; Pre-schedule all propagators to ensure we trigger those + ;; propagators that have no inputs at all. + (state-return (propnet-propagators propnet))))) (define (poll-propnet state) - "Poll propagator network @var{state}. Return a @code{<state+status>} object -containing two values---the updated state of the propagator network and a status -symbol (either @code{completed} or @code{pending})." + "Poll propagator network @var{state}. Return a state-monadic +@code{<state+status>} object containing two values---the current state of the +propagator network and a status symbol (either @code{completed} or +@code{pending})." (define propnet (propnet-state-propnet state)) @@ -149,17 +151,19 @@ symbol (either @code{completed} or @code{pending})." (match-lambda ((propagator-name . state) "Convert the output of a completed propagator into new cell values to -add to the inbox." +add to the inbox. The return value is a state-monadic list." (let ((output-mapping (propagator-outputs (find-propagator-name propnet propagator-name)))) - (map (match-lambda - ((output-name . value) - (cons (or (assoc-ref output-mapping output-name) - (assertion-violation output-name "Unknown output")) - value))) - ((scheduler-capture-output scheduler) - state)))))) + (state-let* ((captured-outputs ((scheduler-capture-output scheduler) + state))) + (state-return + (map (match-lambda + ((output-name . value) + (cons (or (assoc-ref output-mapping output-name) + (assertion-violation output-name "Unknown output")) + value))) + captured-outputs))))))) (define (propagator-input-values cells propagator) "Return input values for @var{propagator} extracted from @var{cells}." @@ -173,129 +177,155 @@ add to the inbox." (define (schedule-propagators propagators cells) "Schedule all propagators among @var{propagators} whose inputs are present in @var{cells}. Return an association list mapping scheduled propagator names to -their states." - (append-map (lambda (propagator) - (maybe-alist - (cons (propagator-name propagator) - (maybe-let* ((propagator-state - (activate-propagator - scheduler - propagator - (propagator-input-values cells propagator)))) - (just (run-with-state propagator-state)))))) - propagators)) +their monadic states." + (state-map (match-lambda + ((name . mpropagator-state) + (state-let* ((propagator-state mpropagator-state)) + (state-return (cons name propagator-state))))) + (append-map (lambda (propagator) + (maybe-alist + (cons (propagator-name propagator) + (activate-propagator + scheduler + propagator + (propagator-input-values cells propagator))))) + propagators))) ;; We implement propagator networks as a state machine. The state consists of ;; the current values of all the cells and the list of all propagators ;; currently in flight. Each iteration of loop represents one state ;; transition. This is a very functional approach. Propagator network ;; implementations don't necessarily have to be mutational. - (let loop ((cells (propnet-state-cells state)) - (cell-values-inbox (propnet-state-cells-inbox state)) - (propagators-inbox (propnet-state-propagators-inbox state)) - (propagators-in-flight (propnet-state-propagators-in-flight state))) - (match cell-values-inbox - ;; Process one new cell value in inbox. - (((cell-name . new-cell-value) - tail-cell-values-inbox ...) - (if ((propnet-value=? propnet) - (maybe-assoc-ref (just cells) cell-name) - (just new-cell-value)) - ;; It's the same value. Nothing to do. - (loop cells - tail-cell-values-inbox - propagators-inbox - propagators-in-flight) - ;; Update the cell and activate propagators. - (let ((cells (maybe-assoc-set cells - (cons cell-name - ((propnet-merge-values propnet) - (maybe-assoc-ref (just cells) cell-name) - (just new-cell-value)))))) - (loop cells - tail-cell-values-inbox + (let loop ((mcells (propnet-state-cells state)) + (mcell-values-inbox (propnet-state-cells-inbox state)) + (mpropagators-inbox (propnet-state-propagators-inbox state)) + (mpropagators-in-flight (propnet-state-propagators-in-flight state))) + (state-let* ((cells mcells) + (cell-values-inbox mcell-values-inbox) + (propagators-inbox mpropagators-inbox) + (propagators-in-flight mpropagators-in-flight)) + (match cell-values-inbox + ;; Process one new cell value in inbox. + (((cell-name . new-cell-value) + tail-cell-values-inbox ...) + (if ((propnet-value=? propnet) + (maybe-assoc-ref (just cells) cell-name) + (just new-cell-value)) + ;; It's the same value. Nothing to do. + (loop mcells + (state-return tail-cell-values-inbox) + mpropagators-inbox + mpropagators-in-flight) + ;; Update the cell and activate propagators. + (loop (state-return (maybe-assoc-set cells + (cons cell-name + ((propnet-merge-values propnet) + (maybe-assoc-ref (just cells) cell-name) + (just new-cell-value))))) + (state-return tail-cell-values-inbox) ;; Enqueue propagators that depend on cell. Union to avoid ;; scheduling the same propagator more than once. - (lset-union eq? - propagators-inbox - (filter (lambda (propagator) - (rassoc cell-name - (propagator-inputs propagator))) - (propnet-propagators propnet))) - propagators-in-flight)))) - ;; In order to minimize the number of times a propagator is run, it is - ;; important to start scheduling them only after all cells in - ;; cell-values-inbox are serviced. - (() - (match propagators-inbox - ;; Poll propagators in flight and update cell values if any of them are - ;; done. - (() - (match propagators-in-flight - ;; All propagators are finished. The propnet has stabilized. We are - ;; done. Return all cell values. - (() - (state+status (propnet-state propnet - cells - cell-values-inbox - propagators-in-flight - propagators-inbox) - 'completed)) - ;; Propagators are still in flight. Check if any of them have - ;; completed. - (_ - (let ((finished-propagators - propagators-still-in-flight - (partition-map (match-lambda - ((_ _ 'completed) #t) - (_ #f)) - (match-lambda - ((name state _) - (cons name state))) - (map (match-lambda + (state-return + (lset-union eq? + propagators-inbox + (filter (lambda (propagator) + (rassoc cell-name + (propagator-inputs propagator))) + (propnet-propagators propnet)))) + mpropagators-in-flight))) + ;; In order to minimize the number of times a propagator is run, it is + ;; important to start scheduling them only after all cells in + ;; cell-values-inbox are serviced. + (() + (match propagators-inbox + ;; Poll propagators in flight and update cell values if any of them + ;; are done. + (() + (match propagators-in-flight + ;; All propagators are finished. The propnet has stabilized. We + ;; are done. Return state and completed status. + (() + (state-return + (state+status (propnet-state propnet + mcells + mcell-values-inbox + mpropagators-in-flight + mpropagators-inbox) + 'completed))) + ;; Propagators are still in flight. Check if any of them have + ;; completed. + (_ + (state-let* ((propagator-states + (state-map (match-lambda ((name . state) - (let ((status state ((scheduler-poll scheduler) - state))) - (list name state status)))) - propagators-in-flight)))) - (match finished-propagators - ;; None of the propagators we checked have completed. Return a - ;; pending state. - (() - (state+status (propnet-state propnet - cells - cell-values-inbox - propagators-still-in-flight - propagators-inbox) - 'pending)) - ;; Some of the propagators we checked have completed. Enqueue - ;; their outputs in the cells inbox and loop. - (_ - (loop cells - (apply assoc-set - cell-values-inbox - (append-map propagator-state->cell-values - finished-propagators)) - propagators-inbox - propagators-still-in-flight))))))) - ;; Schedule propagators in inbox. - (_ - (loop cells - cell-values-inbox - (list) - ;; We don't need to cancel or forget about previous runs of the - ;; same propagator because cells only *accumulate* information; - ;; they never remove it. Any previous runs of the same - ;; propagator will only *add to* the information in the output - ;; cells. Previous runs may be closer to completion and taking - ;; advantage of their output may allow later stages to start - ;; running sooner, thus improving throughput. In our CWL - ;; application of propnets, this will never result in the same - ;; step being recomputed; so this approach does not come at a - ;; higher computational cost. - (append (schedule-propagators propagators-inbox cells) - propagators-in-flight)))))))) + (state-let* ((state+status + ((scheduler-poll scheduler) state))) + (state-return + (list name + (state+status-state state+status) + (state+status-status state+status)))))) + propagators-in-flight))) + (let ((mfinished-propagators + mpropagators-still-in-flight + (call-with-values (cut partition + (match-lambda + ((_ _ 'completed) #t) + (_ #f)) + propagator-states) + (lambda lists + (apply values + (map (cut state-map + (match-lambda + ((name state _) + (state-return (cons name state)))) + <>) + lists)))))) + (state-let* ((finished-propagators mfinished-propagators)) + (match finished-propagators + ;; None of the propagators we checked have completed. + ;; Return a pending state. + (() + (state-return + (state+status (propnet-state propnet + mcells + mcell-values-inbox + mpropagators-still-in-flight + mpropagators-inbox) + 'pending))) + ;; Some of the propagators we checked have completed. + ;; Enqueue their outputs in the cells inbox and loop. + (_ + (loop mcells + (state-let* ((new-cell-values + (state-append-map propagator-state->cell-values + finished-propagators))) + (state-return (apply assoc-set + cell-values-inbox + new-cell-values))) + mpropagators-inbox + mpropagators-still-in-flight))))))))) + ;; Schedule propagators in inbox. + (_ + (loop mcells + mcell-values-inbox + (state-return (list)) + ;; We don't need to cancel or forget about previous runs of + ;; the same propagator because cells only *accumulate* + ;; information; they never remove it. Any previous runs of the + ;; same propagator will only *add to* the information in the + ;; output cells. Previous runs may be closer to completion and + ;; taking advantage of their output may allow later stages to + ;; start running sooner, thus improving throughput. In our CWL + ;; application of propnets, this will never result in the same + ;; step being recomputed; so this approach does not come at a + ;; higher computational cost. + (state-let* ((new-propagators-in-flight + (schedule-propagators propagators-inbox + cells))) + (state-return + (append new-propagators-in-flight + propagators-in-flight))))))))))) (define (capture-propnet-output state) - "Return output of propagator network @var{state}." + "Return output of propagator network @var{state} as a state-monadic value." (propnet-state-cells state)) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 3692ede..158edcd 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -249,8 +249,8 @@ propagator." #:key guix-daemon-socket) (define (schedule proc inputs scheduler) "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a -job state object. @var{proc} may either be a @code{<propnet>} object or a -@code{<scheduler-proc>} object." +state-monadic job state object. @var{proc} may either be a @code{<propnet>} +object or a @code{<scheduler-proc>} object." (let* ((name (scheduler-proc-name proc)) (cwl (scheduler-proc-cwl proc)) (scatter (from-maybe (scheduler-proc-scatter proc) @@ -261,7 +261,7 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a (if scatter (case scatter-method ((dot-product) - (apply map + (apply state-map (lambda input-elements ;; Recurse with scattered inputs spliced in. (schedule (scheduler-proc name cwl %nothing %nothing) @@ -303,18 +303,21 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a ((string=? class "ExpressionTool") (error "Workflow class not implemented yet" class)) ((string=? class "Workflow") - (state-return - (workflow-state (schedule-propnet (workflow-class->propnet name - cwl - scheduler - batch-system) - inputs) - (assoc-ref* cwl "outputs"))))))))) + (state-let* ((propnet-state + (schedule-propnet (workflow-class->propnet name + cwl + scheduler + batch-system) + inputs))) + (state-return + (workflow-state propnet-state + (assoc-ref* cwl "outputs")))))))))) (define (poll state) "Return updated state and current status of job @var{state} object as a -@code{<state+status>} object. The status is one of the symbols @code{pending} or -@code{completed}." +state-monadic @code{<state+status>} object. The status is one of the symbols +@code{pending} or @code{completed}. Within the monadic value, raise an exception +and exit if job has failed." (guard (c ((job-failure? c) (let ((script (job-failure-script c))) (user-error @@ -326,31 +329,34 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a ;; Return list states as completed only if all state elements in it are ;; completed. ((list? state) - (if (every (lambda (state+status) - (eq? (state+status-status state+status) - 'completed)) - polled-states) - (state+status state 'completed) - (state+status state 'pending))) + (state-let* ((polled-states (state-map poll state))) + (state-return + (if (every (lambda (state+status) + (eq? (state+status-status state+status) + 'completed)) + polled-states) + (state+status state 'completed) + (state+status state 'pending))))) ;; Poll job state. Raise an exception if the job has failed. ((command-line-tool-state? state) - (state+status state - (case (job-state-status (command-line-tool-state-job-state state) - batch-system) - ((failed) - (raise-exception (job-failure - (job-state-script - (command-line-tool-state-job-state state))))) - (else => identity)))) + (let ((job-state (command-line-tool-state-job-state state))) + (state-let* ((status (job-state-status job-state batch-system))) + (state-return + (state+status state + (case status + ((failed) + (raise-exception (job-failure + (job-state-script job-state)))) + (else => identity))))))) ;; Poll sub-workflow state. We do not need to check the status here since ;; job failures only occur at the level of a CommandLineTool. ((workflow-state? state) - (let ((updated-state+status - (poll-propnet (workflow-state-propnet-state state)))) - (state+status - (set-workflow-state-propnet-state state - (state+status-state updated-state+status)) - (state+status-status updated-state+status)))) + (state-let* ((updated-state+status + (poll-propnet (workflow-state-propnet-state state)))) + (state-return + (state+status (set-workflow-state-propnet-state + state (state+status-state updated-state+status)) + (state+status-status updated-state+status))))) (else (assertion-violation state "Invalid state"))))) @@ -388,36 +394,42 @@ is the class of the workflow." "Return output of completed job @var{state}." (cond ((workflow-state? state) - (filter-outputs "Workflow" - (capture-propnet-output - (workflow-state-propnet-state state)) - (workflow-state-formal-outputs state))) + (state-let* ((outputs (capture-propnet-output + (workflow-state-propnet-state state)))) + (state-return + (filter-outputs "Workflow" + outputs + (workflow-state-formal-outputs state))))) ((list? state) ;; Combine outputs from individual state elements. - (match (map capture-output state) - ((and (head-output _ ...) - outputs) - (map (match-lambda - ((id . value) - (cons id - (map->vector (lambda (output) - ;; FIXME: Is this the correct way to - ;; handle missing outputs? - (or (assoc-ref output id) - 'null)) - outputs)))) - head-output)))) + (state-let* ((captured-outputs (state-map capture-output state))) + (match captured-outputs + ((and (head-output _ ...) + outputs) + (state-return + (map (match-lambda + ((id . value) + (cons id + (map->vector (lambda (output) + ;; FIXME: Is this the correct way to + ;; handle missing outputs? + (or (assoc-ref output id) + 'null)) + outputs)))) + head-output)))))) (else ;; Log progress and return captured output. (let ((script (job-state-script (command-line-tool-state-job-state state)))) - (format (current-error-port) - "~a completed; logs at ~a and ~a~%" - script - (script->store-stdout-file script store) - (script->store-stderr-file script store)) - (filter-outputs "CommandLineTool" - (capture-command-line-tool-output script store) - (command-line-tool-state-formal-outputs state)))))) + (state-return + (begin + (format (current-error-port) + "~a completed; logs at ~a and ~a~%" + script + (script->store-stdout-file script store) + (script->store-stderr-file script store)) + (filter-outputs "CommandLineTool" + (capture-command-line-tool-output script store) + (command-line-tool-state-formal-outputs state)))))))) (scheduler schedule poll capture-output)) @@ -612,25 +624,27 @@ area need not be shared. @var{store} is the path to the shared ravanan store. (let ((scheduler (workflow-scheduler manifest-file channels scratch store batch-system #:guix-daemon-socket guix-daemon-socket))) - (let loop ((state (run-with-state - ((scheduler-schedule scheduler) - (scheduler-proc name cwl %nothing %nothing) - inputs - scheduler)))) - ;; Poll. - (let ((state+status ((scheduler-poll scheduler) state))) - (if (eq? (state+status-status state+status) 'pending) - (begin - ;; Pause before looping and polling again so we don't bother the - ;; job server too often. - (sleep (cond + (run-with-state + (let loop ((mstate ((scheduler-schedule scheduler) + (scheduler-proc name cwl %nothing %nothing) + inputs + scheduler))) + ;; Poll. + (state-let* ((state mstate) + (state+status ((scheduler-poll scheduler) state))) + (if (eq? (state+status-status state+status) + 'pending) + (begin + ;; Pause before looping and polling again so we don't bother the + ;; job server too often. + (sleep (cond ;; Single machine jobs are run synchronously. So, there ;; is no need to wait to poll them. ((eq? batch-system 'single-machine) 0) ((slurm-api-batch-system? batch-system) %job-poll-interval))) - (loop (state+status-state state+status))) - ;; Capture outputs. - ((scheduler-capture-output scheduler) - (state+status-state state+status))))))) + (loop (state-return (state+status-state state+status)))) + ;; Capture outputs. + ((scheduler-capture-output scheduler) + (state+status-state state+status)))))))) |