aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Isaac2025-01-20 01:37:18 +0000
committerArun Isaac2025-01-21 00:16:17 +0000
commit631ca04925a7f65caea4abdddf599a1964ce4d35 (patch)
tree2aec171dd06546e04f1527b800f0b275fcbc19dc
parente3fb375aead20c1ef6518750cc6dbc2375f57bc9 (diff)
downloadravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.tar.gz
ravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.tar.lz
ravanan-631ca04925a7f65caea4abdddf599a1964ce4d35.zip
propnet: Make interface state-monadic.
* ravanan/job-state.scm (job-state-status): Return a state-monadic value. * ravanan/propnet.scm: Import (srfi srfi-26). (schedule-propnet, poll-propnet, capture-propnet-output): Return a state-monadic value. * ravanan/workflow.scm (workflow-scheduler)[schedule, poll, capture-output]: Return a state-monadic value. (run-workflow): Accept state-monadic values from schedule, poll and capture-output.
-rw-r--r--ravanan/job-state.scm33
-rw-r--r--ravanan/propnet.scm298
-rw-r--r--ravanan/workflow.scm160
3 files changed, 268 insertions, 223 deletions
diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm
index aa709f0..a769698 100644
--- a/ravanan/job-state.scm
+++ b/ravanan/job-state.scm
@@ -57,28 +57,29 @@
state))
(define* (job-state-status state batch-system)
- "Return current status and updated state of job with @var{state} on
-@var{batch-system}. The status is one of the symbols @code{completed},
-@code{failed} or @code{pending}."
+ "Return current status of job with @var{state} on @var{batch-system}. The status
+is one of the symbols @code{completed}, @code{failed} or @code{pending}
+encapsulated in the state monad."
(cond
;; Single machine jobs are run synchronously. So, they return success or
;; failure immediately.
((single-machine-job-state? state)
- (if (single-machine-job-state-success? state)
- 'completed
- 'failed))
+ (state-return
+ (if (single-machine-job-state-success? state)
+ 'completed
+ 'failed)))
;; Poll slurm for job state.
((slurm-job-state? state)
- (run-with-state
- (job-state (slurm-job-state-job-id state)
- #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
- #:jwt (slurm-api-batch-system-jwt batch-system))))
+ (job-state (slurm-job-state-job-id state)
+ #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+ #:jwt (slurm-api-batch-system-jwt batch-system)))
;; For list states, poll each state element and return 'completed only if all
;; state elements have completed.
((list? state)
- (or (every (lambda (state-element)
- (case (job-state-status state-element batch-system)
- ((completed) => identity)
- (else #f)))
- state)
- 'pending))))
+ (state-return
+ (or (every (lambda (state-element)
+ (case (job-state-status state-element batch-system)
+ ((completed) => identity)
+ (else #f)))
+ state)
+ 'pending)))))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index 360aea4..34624b5 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -20,6 +20,7 @@
#:use-module ((rnrs base) #:select (assertion-violation))
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9 gnu)
+ #:use-module (srfi srfi-26)
#:use-module (srfi srfi-71)
#:use-module (ice-9 match)
#:use-module (ravanan work monads)
@@ -124,21 +125,22 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}."
(propnet-propagators propnet)))
(define (schedule-propnet propnet initial-cell-values)
- "Start @var{propnet} with @var{initial-cell-values}, and return
+ "Start @var{propnet} with @var{initial-cell-values}, and return a state-monadic
@code{<propnet-state>} object."
- (propnet-state propnet
- (list)
- initial-cell-values
- (list)
- ;; Pre-schedule all propagators to ensure we
- ;; trigger those propagators that have no inputs
- ;; at all.
- (propnet-propagators propnet)))
+ (state-return
+ (propnet-state propnet
+ (state-return (list))
+ (state-return initial-cell-values)
+ (state-return (list))
+ ;; Pre-schedule all propagators to ensure we trigger those
+ ;; propagators that have no inputs at all.
+ (state-return (propnet-propagators propnet)))))
(define (poll-propnet state)
- "Poll propagator network @var{state}. Return a @code{<state+status>} object
-containing two values---the updated state of the propagator network and a status
-symbol (either @code{completed} or @code{pending})."
+ "Poll propagator network @var{state}. Return a state-monadic
+@code{<state+status>} object containing two values---the current state of the
+propagator network and a status symbol (either @code{completed} or
+@code{pending})."
(define propnet
(propnet-state-propnet state))
@@ -149,17 +151,19 @@ symbol (either @code{completed} or @code{pending})."
(match-lambda
((propagator-name . state)
"Convert the output of a completed propagator into new cell values to
-add to the inbox."
+add to the inbox. The return value is a state-monadic list."
(let ((output-mapping
(propagator-outputs
(find-propagator-name propnet propagator-name))))
- (map (match-lambda
- ((output-name . value)
- (cons (or (assoc-ref output-mapping output-name)
- (assertion-violation output-name "Unknown output"))
- value)))
- ((scheduler-capture-output scheduler)
- state))))))
+ (state-let* ((captured-outputs ((scheduler-capture-output scheduler)
+ state)))
+ (state-return
+ (map (match-lambda
+ ((output-name . value)
+ (cons (or (assoc-ref output-mapping output-name)
+ (assertion-violation output-name "Unknown output"))
+ value)))
+ captured-outputs)))))))
(define (propagator-input-values cells propagator)
"Return input values for @var{propagator} extracted from @var{cells}."
@@ -173,129 +177,155 @@ add to the inbox."
(define (schedule-propagators propagators cells)
"Schedule all propagators among @var{propagators} whose inputs are present in
@var{cells}. Return an association list mapping scheduled propagator names to
-their states."
- (append-map (lambda (propagator)
- (maybe-alist
- (cons (propagator-name propagator)
- (maybe-let* ((propagator-state
- (activate-propagator
- scheduler
- propagator
- (propagator-input-values cells propagator))))
- (just (run-with-state propagator-state))))))
- propagators))
+their monadic states."
+ (state-map (match-lambda
+ ((name . mpropagator-state)
+ (state-let* ((propagator-state mpropagator-state))
+ (state-return (cons name propagator-state)))))
+ (append-map (lambda (propagator)
+ (maybe-alist
+ (cons (propagator-name propagator)
+ (activate-propagator
+ scheduler
+ propagator
+ (propagator-input-values cells propagator)))))
+ propagators)))
;; We implement propagator networks as a state machine. The state consists of
;; the current values of all the cells and the list of all propagators
;; currently in flight. Each iteration of loop represents one state
;; transition. This is a very functional approach. Propagator network
;; implementations don't necessarily have to be mutational.
- (let loop ((cells (propnet-state-cells state))
- (cell-values-inbox (propnet-state-cells-inbox state))
- (propagators-inbox (propnet-state-propagators-inbox state))
- (propagators-in-flight (propnet-state-propagators-in-flight state)))
- (match cell-values-inbox
- ;; Process one new cell value in inbox.
- (((cell-name . new-cell-value)
- tail-cell-values-inbox ...)
- (if ((propnet-value=? propnet)
- (maybe-assoc-ref (just cells) cell-name)
- (just new-cell-value))
- ;; It's the same value. Nothing to do.
- (loop cells
- tail-cell-values-inbox
- propagators-inbox
- propagators-in-flight)
- ;; Update the cell and activate propagators.
- (let ((cells (maybe-assoc-set cells
- (cons cell-name
- ((propnet-merge-values propnet)
- (maybe-assoc-ref (just cells) cell-name)
- (just new-cell-value))))))
- (loop cells
- tail-cell-values-inbox
+ (let loop ((mcells (propnet-state-cells state))
+ (mcell-values-inbox (propnet-state-cells-inbox state))
+ (mpropagators-inbox (propnet-state-propagators-inbox state))
+ (mpropagators-in-flight (propnet-state-propagators-in-flight state)))
+ (state-let* ((cells mcells)
+ (cell-values-inbox mcell-values-inbox)
+ (propagators-inbox mpropagators-inbox)
+ (propagators-in-flight mpropagators-in-flight))
+ (match cell-values-inbox
+ ;; Process one new cell value in inbox.
+ (((cell-name . new-cell-value)
+ tail-cell-values-inbox ...)
+ (if ((propnet-value=? propnet)
+ (maybe-assoc-ref (just cells) cell-name)
+ (just new-cell-value))
+ ;; It's the same value. Nothing to do.
+ (loop mcells
+ (state-return tail-cell-values-inbox)
+ mpropagators-inbox
+ mpropagators-in-flight)
+ ;; Update the cell and activate propagators.
+ (loop (state-return (maybe-assoc-set cells
+ (cons cell-name
+ ((propnet-merge-values propnet)
+ (maybe-assoc-ref (just cells) cell-name)
+ (just new-cell-value)))))
+ (state-return tail-cell-values-inbox)
;; Enqueue propagators that depend on cell. Union to avoid
;; scheduling the same propagator more than once.
- (lset-union eq?
- propagators-inbox
- (filter (lambda (propagator)
- (rassoc cell-name
- (propagator-inputs propagator)))
- (propnet-propagators propnet)))
- propagators-in-flight))))
- ;; In order to minimize the number of times a propagator is run, it is
- ;; important to start scheduling them only after all cells in
- ;; cell-values-inbox are serviced.
- (()
- (match propagators-inbox
- ;; Poll propagators in flight and update cell values if any of them are
- ;; done.
- (()
- (match propagators-in-flight
- ;; All propagators are finished. The propnet has stabilized. We are
- ;; done. Return all cell values.
- (()
- (state+status (propnet-state propnet
- cells
- cell-values-inbox
- propagators-in-flight
- propagators-inbox)
- 'completed))
- ;; Propagators are still in flight. Check if any of them have
- ;; completed.
- (_
- (let ((finished-propagators
- propagators-still-in-flight
- (partition-map (match-lambda
- ((_ _ 'completed) #t)
- (_ #f))
- (match-lambda
- ((name state _)
- (cons name state)))
- (map (match-lambda
+ (state-return
+ (lset-union eq?
+ propagators-inbox
+ (filter (lambda (propagator)
+ (rassoc cell-name
+ (propagator-inputs propagator)))
+ (propnet-propagators propnet))))
+ mpropagators-in-flight)))
+ ;; In order to minimize the number of times a propagator is run, it is
+ ;; important to start scheduling them only after all cells in
+ ;; cell-values-inbox are serviced.
+ (()
+ (match propagators-inbox
+ ;; Poll propagators in flight and update cell values if any of them
+ ;; are done.
+ (()
+ (match propagators-in-flight
+ ;; All propagators are finished. The propnet has stabilized. We
+ ;; are done. Return state and completed status.
+ (()
+ (state-return
+ (state+status (propnet-state propnet
+ mcells
+ mcell-values-inbox
+ mpropagators-in-flight
+ mpropagators-inbox)
+ 'completed)))
+ ;; Propagators are still in flight. Check if any of them have
+ ;; completed.
+ (_
+ (state-let* ((propagator-states
+ (state-map (match-lambda
((name . state)
- (let ((status state ((scheduler-poll scheduler)
- state)))
- (list name state status))))
- propagators-in-flight))))
- (match finished-propagators
- ;; None of the propagators we checked have completed. Return a
- ;; pending state.
- (()
- (state+status (propnet-state propnet
- cells
- cell-values-inbox
- propagators-still-in-flight
- propagators-inbox)
- 'pending))
- ;; Some of the propagators we checked have completed. Enqueue
- ;; their outputs in the cells inbox and loop.
- (_
- (loop cells
- (apply assoc-set
- cell-values-inbox
- (append-map propagator-state->cell-values
- finished-propagators))
- propagators-inbox
- propagators-still-in-flight)))))))
- ;; Schedule propagators in inbox.
- (_
- (loop cells
- cell-values-inbox
- (list)
- ;; We don't need to cancel or forget about previous runs of the
- ;; same propagator because cells only *accumulate* information;
- ;; they never remove it. Any previous runs of the same
- ;; propagator will only *add to* the information in the output
- ;; cells. Previous runs may be closer to completion and taking
- ;; advantage of their output may allow later stages to start
- ;; running sooner, thus improving throughput. In our CWL
- ;; application of propnets, this will never result in the same
- ;; step being recomputed; so this approach does not come at a
- ;; higher computational cost.
- (append (schedule-propagators propagators-inbox cells)
- propagators-in-flight))))))))
+ (state-let* ((state+status
+ ((scheduler-poll scheduler) state)))
+ (state-return
+ (list name
+ (state+status-state state+status)
+ (state+status-status state+status))))))
+ propagators-in-flight)))
+ (let ((mfinished-propagators
+ mpropagators-still-in-flight
+ (call-with-values (cut partition
+ (match-lambda
+ ((_ _ 'completed) #t)
+ (_ #f))
+ propagator-states)
+ (lambda lists
+ (apply values
+ (map (cut state-map
+ (match-lambda
+ ((name state _)
+ (state-return (cons name state))))
+ <>)
+ lists))))))
+ (state-let* ((finished-propagators mfinished-propagators))
+ (match finished-propagators
+ ;; None of the propagators we checked have completed.
+ ;; Return a pending state.
+ (()
+ (state-return
+ (state+status (propnet-state propnet
+ mcells
+ mcell-values-inbox
+ mpropagators-still-in-flight
+ mpropagators-inbox)
+ 'pending)))
+ ;; Some of the propagators we checked have completed.
+ ;; Enqueue their outputs in the cells inbox and loop.
+ (_
+ (loop mcells
+ (state-let* ((new-cell-values
+ (state-append-map propagator-state->cell-values
+ finished-propagators)))
+ (state-return (apply assoc-set
+ cell-values-inbox
+ new-cell-values)))
+ mpropagators-inbox
+ mpropagators-still-in-flight)))))))))
+ ;; Schedule propagators in inbox.
+ (_
+ (loop mcells
+ mcell-values-inbox
+ (state-return (list))
+ ;; We don't need to cancel or forget about previous runs of
+ ;; the same propagator because cells only *accumulate*
+ ;; information; they never remove it. Any previous runs of the
+ ;; same propagator will only *add to* the information in the
+ ;; output cells. Previous runs may be closer to completion and
+ ;; taking advantage of their output may allow later stages to
+ ;; start running sooner, thus improving throughput. In our CWL
+ ;; application of propnets, this will never result in the same
+ ;; step being recomputed; so this approach does not come at a
+ ;; higher computational cost.
+ (state-let* ((new-propagators-in-flight
+ (schedule-propagators propagators-inbox
+ cells)))
+ (state-return
+ (append new-propagators-in-flight
+ propagators-in-flight)))))))))))
(define (capture-propnet-output state)
- "Return output of propagator network @var{state}."
+ "Return output of propagator network @var{state} as a state-monadic value."
(propnet-state-cells state))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 3692ede..158edcd 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -249,8 +249,8 @@ propagator."
#:key guix-daemon-socket)
(define (schedule proc inputs scheduler)
"Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object. @var{proc} may either be a @code{<propnet>} object or a
-@code{<scheduler-proc>} object."
+state-monadic job state object. @var{proc} may either be a @code{<propnet>}
+object or a @code{<scheduler-proc>} object."
(let* ((name (scheduler-proc-name proc))
(cwl (scheduler-proc-cwl proc))
(scatter (from-maybe (scheduler-proc-scatter proc)
@@ -261,7 +261,7 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
(if scatter
(case scatter-method
((dot-product)
- (apply map
+ (apply state-map
(lambda input-elements
;; Recurse with scattered inputs spliced in.
(schedule (scheduler-proc name cwl %nothing %nothing)
@@ -303,18 +303,21 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
((string=? class "ExpressionTool")
(error "Workflow class not implemented yet" class))
((string=? class "Workflow")
- (state-return
- (workflow-state (schedule-propnet (workflow-class->propnet name
- cwl
- scheduler
- batch-system)
- inputs)
- (assoc-ref* cwl "outputs")))))))))
+ (state-let* ((propnet-state
+ (schedule-propnet (workflow-class->propnet name
+ cwl
+ scheduler
+ batch-system)
+ inputs)))
+ (state-return
+ (workflow-state propnet-state
+ (assoc-ref* cwl "outputs"))))))))))
(define (poll state)
"Return updated state and current status of job @var{state} object as a
-@code{<state+status>} object. The status is one of the symbols @code{pending} or
-@code{completed}."
+state-monadic @code{<state+status>} object. The status is one of the symbols
+@code{pending} or @code{completed}. Within the monadic value, raise an exception
+and exit if job has failed."
(guard (c ((job-failure? c)
(let ((script (job-failure-script c)))
(user-error
@@ -326,31 +329,34 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
;; Return list states as completed only if all state elements in it are
;; completed.
((list? state)
- (if (every (lambda (state+status)
- (eq? (state+status-status state+status)
- 'completed))
- polled-states)
- (state+status state 'completed)
- (state+status state 'pending)))
+ (state-let* ((polled-states (state-map poll state)))
+ (state-return
+ (if (every (lambda (state+status)
+ (eq? (state+status-status state+status)
+ 'completed))
+ polled-states)
+ (state+status state 'completed)
+ (state+status state 'pending)))))
;; Poll job state. Raise an exception if the job has failed.
((command-line-tool-state? state)
- (state+status state
- (case (job-state-status (command-line-tool-state-job-state state)
- batch-system)
- ((failed)
- (raise-exception (job-failure
- (job-state-script
- (command-line-tool-state-job-state state)))))
- (else => identity))))
+ (let ((job-state (command-line-tool-state-job-state state)))
+ (state-let* ((status (job-state-status job-state batch-system)))
+ (state-return
+ (state+status state
+ (case status
+ ((failed)
+ (raise-exception (job-failure
+ (job-state-script job-state))))
+ (else => identity)))))))
;; Poll sub-workflow state. We do not need to check the status here since
;; job failures only occur at the level of a CommandLineTool.
((workflow-state? state)
- (let ((updated-state+status
- (poll-propnet (workflow-state-propnet-state state))))
- (state+status
- (set-workflow-state-propnet-state state
- (state+status-state updated-state+status))
- (state+status-status updated-state+status))))
+ (state-let* ((updated-state+status
+ (poll-propnet (workflow-state-propnet-state state))))
+ (state-return
+ (state+status (set-workflow-state-propnet-state
+ state (state+status-state updated-state+status))
+ (state+status-status updated-state+status)))))
(else
(assertion-violation state "Invalid state")))))
@@ -388,36 +394,42 @@ is the class of the workflow."
"Return output of completed job @var{state}."
(cond
((workflow-state? state)
- (filter-outputs "Workflow"
- (capture-propnet-output
- (workflow-state-propnet-state state))
- (workflow-state-formal-outputs state)))
+ (state-let* ((outputs (capture-propnet-output
+ (workflow-state-propnet-state state))))
+ (state-return
+ (filter-outputs "Workflow"
+ outputs
+ (workflow-state-formal-outputs state)))))
((list? state)
;; Combine outputs from individual state elements.
- (match (map capture-output state)
- ((and (head-output _ ...)
- outputs)
- (map (match-lambda
- ((id . value)
- (cons id
- (map->vector (lambda (output)
- ;; FIXME: Is this the correct way to
- ;; handle missing outputs?
- (or (assoc-ref output id)
- 'null))
- outputs))))
- head-output))))
+ (state-let* ((captured-outputs (state-map capture-output state)))
+ (match captured-outputs
+ ((and (head-output _ ...)
+ outputs)
+ (state-return
+ (map (match-lambda
+ ((id . value)
+ (cons id
+ (map->vector (lambda (output)
+ ;; FIXME: Is this the correct way to
+ ;; handle missing outputs?
+ (or (assoc-ref output id)
+ 'null))
+ outputs))))
+ head-output))))))
(else
;; Log progress and return captured output.
(let ((script (job-state-script (command-line-tool-state-job-state state))))
- (format (current-error-port)
- "~a completed; logs at ~a and ~a~%"
- script
- (script->store-stdout-file script store)
- (script->store-stderr-file script store))
- (filter-outputs "CommandLineTool"
- (capture-command-line-tool-output script store)
- (command-line-tool-state-formal-outputs state))))))
+ (state-return
+ (begin
+ (format (current-error-port)
+ "~a completed; logs at ~a and ~a~%"
+ script
+ (script->store-stdout-file script store)
+ (script->store-stderr-file script store))
+ (filter-outputs "CommandLineTool"
+ (capture-command-line-tool-output script store)
+ (command-line-tool-state-formal-outputs state))))))))
(scheduler schedule poll capture-output))
@@ -612,25 +624,27 @@ area need not be shared. @var{store} is the path to the shared ravanan store.
(let ((scheduler (workflow-scheduler
manifest-file channels scratch store batch-system
#:guix-daemon-socket guix-daemon-socket)))
- (let loop ((state (run-with-state
- ((scheduler-schedule scheduler)
- (scheduler-proc name cwl %nothing %nothing)
- inputs
- scheduler))))
- ;; Poll.
- (let ((state+status ((scheduler-poll scheduler) state)))
- (if (eq? (state+status-status state+status) 'pending)
- (begin
- ;; Pause before looping and polling again so we don't bother the
- ;; job server too often.
- (sleep (cond
+ (run-with-state
+ (let loop ((mstate ((scheduler-schedule scheduler)
+ (scheduler-proc name cwl %nothing %nothing)
+ inputs
+ scheduler)))
+ ;; Poll.
+ (state-let* ((state mstate)
+ (state+status ((scheduler-poll scheduler) state)))
+ (if (eq? (state+status-status state+status)
+ 'pending)
+ (begin
+ ;; Pause before looping and polling again so we don't bother the
+ ;; job server too often.
+ (sleep (cond
;; Single machine jobs are run synchronously. So, there
;; is no need to wait to poll them.
((eq? batch-system 'single-machine)
0)
((slurm-api-batch-system? batch-system)
%job-poll-interval)))
- (loop (state+status-state state+status)))
- ;; Capture outputs.
- ((scheduler-capture-output scheduler)
- (state+status-state state+status)))))))
+ (loop (state-return (state+status-state state+status))))
+ ;; Capture outputs.
+ ((scheduler-capture-output scheduler)
+ (state+status-state state+status))))))))