aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Isaac2025-01-13 00:45:40 +0000
committerArun Isaac2025-01-21 00:16:16 +0000
commit231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1 (patch)
treec160a907a38e0a86282af9ed147526074a0e682e
parent54e83118a2f99eb7c3c916878afc6f8132e9aeef (diff)
downloadravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.tar.gz
ravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.tar.lz
ravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.zip
workflow: Return state of scheduled jobs as a state-monadic value.
* ravanan/command-line-tool.scm (run-command-line-tool): Return state of scheduled jobs as a state-monadic value. * ravanan/workflow.scm (workflow-scheduler)[schedule]: Return state of scheduled jobs as a state-monadic value. (run-workflow): Run state of scheduled workflow in the state monad. * ravanan/propnet.scm (poll-propnet): Run state of scheduled jobs in the state monad. [schedule-propagators]: New function.
-rw-r--r--ravanan/command-line-tool.scm66
-rw-r--r--ravanan/propnet.scm24
-rw-r--r--ravanan/workflow.scm47
3 files changed, 73 insertions, 64 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 7da91e3..a5b800f 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -389,7 +389,8 @@ path."
scratch store batch-system
#:key guix-daemon-socket)
"Run @code{CommandLineTool} class workflow @var{cwl} named @var{name} with
-@var{inputs} using tools from Guix manifest in @var{manifest-file}.
+@var{inputs} using tools from Guix manifest in @var{manifest-file}. Return a
+state-monadic job state object.
@var{channels}, @var{scratch}, @var{store}, @var{batch-system} and
@var{guix-daemon-socket} are the same as in @code{run-workflow} from
@@ -422,11 +423,12 @@ path."
(if (file-exists? store-data-file)
;; Return a dummy success state object if script has already
;; been run successfully.
- (begin
- (format (current-error-port)
- "~a previously run; retrieving result from store~%"
- script)
- (single-machine-job-state script #t))
+ (state-return
+ (begin
+ (format (current-error-port)
+ "~a previously run; retrieving result from store~%"
+ script)
+ (single-machine-job-state script #t)))
;; Run script if it has not already been run.
(begin
;; Delete output files directory if an incomplete one exists
@@ -440,39 +442,35 @@ path."
(mkdir store-files-directory)
(cond
((eq? batch-system 'single-machine)
- (single-machine-job-state script
- (run-with-state
- (single-machine:submit-job
- `(("WORKFLOW_OUTPUT_DIRECTORY" .
- ,store-files-directory)
- ("WORKFLOW_OUTPUT_DATA_FILE" .
- ,store-data-file))
- stdout-file
- stderr-file
- script))))
+ (state-let* ((success? (single-machine:submit-job
+ `(("WORKFLOW_OUTPUT_DIRECTORY" .
+ ,store-files-directory)
+ ("WORKFLOW_OUTPUT_DATA_FILE" .
+ ,store-data-file))
+ stdout-file
+ stderr-file
+ script)))
+ (state-return (single-machine-job-state script success?))))
((slurm-api-batch-system? batch-system)
- (format (current-error-port)
- "Submitting job ~a~%"
- script)
- (let ((job-id (run-with-state
- (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
- ,store-files-directory)
- ("WORKFLOW_OUTPUT_DATA_FILE" .
- ,store-data-file))
- stdout-file
- stderr-file
- cpus
- name
- script
- #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
- #:jwt (slurm-api-batch-system-jwt batch-system)
- #:partition (slurm-api-batch-system-partition batch-system)
- #:nice (slurm-api-batch-system-nice batch-system)))))
+ (state-let* ((job-id
+ (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
+ ,store-files-directory)
+ ("WORKFLOW_OUTPUT_DATA_FILE" .
+ ,store-data-file))
+ stdout-file
+ stderr-file
+ cpus
+ name
+ script
+ #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+ #:jwt (slurm-api-batch-system-jwt batch-system)
+ #:partition (slurm-api-batch-system-partition batch-system)
+ #:nice (slurm-api-batch-system-nice batch-system))))
(format (current-error-port)
"~a submitted as job ID ~a~%"
script
job-id)
- (slurm-job-state script job-id)))
+ (state-return (slurm-job-state script job-id))))
(else
(assertion-violation batch-system "Invalid batch system")))))))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index dfe61ac..da74dec 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -161,6 +161,21 @@ add to the inbox."
(maybe-assoc-ref (just cells) cell-name))))
(propagator-inputs propagator))))
+ (define (schedule-propagators propagators cells)
+ "Schedule all propagators among @var{propagators} whose inputs are present in
+@var{cells}. Return an association list mapping scheduled propagator names to
+their states."
+ (append-map (lambda (propagator)
+ (maybe-alist
+ (cons (propagator-name propagator)
+ (maybe-let* ((propagator-state
+ (activate-propagator
+ scheduler
+ propagator
+ (propagator-input-values cells propagator))))
+ (just (run-with-state propagator-state))))))
+ propagators))
+
;; We implement propagator networks as a state machine. The state consists of
;; the current values of all the cells and the list of all propagators
;; currently in flight. Each iteration of loop represents one state
@@ -269,14 +284,7 @@ add to the inbox."
;; application of propnets, this will never result in the same
;; step being recomputed; so this approach does not come at a
;; higher computational cost.
- (append (append-map (lambda (propagator)
- (maybe-alist
- (cons (propagator-name propagator)
- (activate-propagator
- scheduler
- propagator
- (propagator-input-values cells propagator)))))
- propagators-inbox)
+ (append (schedule-propagators propagators-inbox cells)
propagators-in-flight))))))))
(define (capture-propnet-output state)
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 09d733c..1ef9284 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -1,5 +1,5 @@
;;; ravanan --- High-reproducibility CWL runner powered by Guix
-;;; Copyright © 2024 Arun Isaac <arunisaac@systemreboot.net>
+;;; Copyright © 2024, 2025 Arun Isaac <arunisaac@systemreboot.net>
;;;
;;; This file is part of ravanan.
;;;
@@ -288,26 +288,28 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
store)))
(cond
((string=? class "CommandLineTool")
- (command-line-tool-state
- (run-command-line-tool name
- manifest-file
- channels
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket)
- (assoc-ref* cwl "outputs")))
+ (state-let* ((job-state
+ (run-command-line-tool name
+ manifest-file
+ channels
+ cwl
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket)))
+ (state-return (command-line-tool-state job-state
+ (assoc-ref* cwl "outputs")))))
((string=? class "ExpressionTool")
(error "Workflow class not implemented yet" class))
((string=? class "Workflow")
- (workflow-state (schedule-propnet (workflow-class->propnet name
- cwl
- scheduler
- batch-system)
- inputs)
- (assoc-ref* cwl "outputs"))))))))
+ (state-return
+ (workflow-state (schedule-propnet (workflow-class->propnet name
+ cwl
+ scheduler
+ batch-system)
+ inputs)
+ (assoc-ref* cwl "outputs")))))))))
(define (poll state)
"Return current status and updated state of job @var{state} object. The status is
@@ -610,10 +612,11 @@ area need not be shared. @var{store} is the path to the shared ravanan store.
(let ((scheduler (workflow-scheduler
manifest-file channels scratch store batch-system
#:guix-daemon-socket guix-daemon-socket)))
- (let loop ((state ((scheduler-schedule scheduler)
- (scheduler-proc name cwl %nothing %nothing)
- inputs
- scheduler)))
+ (let loop ((state (run-with-state
+ ((scheduler-schedule scheduler)
+ (scheduler-proc name cwl %nothing %nothing)
+ inputs
+ scheduler))))
;; Poll.
(let ((status state ((scheduler-poll scheduler) state)))
(if (eq? status 'pending)