about summary refs log tree commit diff
diff options
context:
space:
mode:
authorArun Isaac2025-01-13 00:45:40 +0000
committerArun Isaac2025-01-21 00:16:16 +0000
commit231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1 (patch)
treec160a907a38e0a86282af9ed147526074a0e682e
parent54e83118a2f99eb7c3c916878afc6f8132e9aeef (diff)
downloadravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.tar.gz
ravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.tar.lz
ravanan-231c3532bc6b6325cebf2d0d28d41f7ea4c2ace1.zip
workflow: Return state of scheduled jobs as a state-monadic value.
* ravanan/command-line-tool.scm (run-command-line-tool): Return state
of scheduled jobs as a state-monadic value.
* ravanan/workflow.scm (workflow-scheduler)[schedule]: Return state of
scheduled jobs as a state-monadic value.
(run-workflow): Run state of scheduled workflow in the state monad.
* ravanan/propnet.scm (poll-propnet): Run state of scheduled jobs in
the state monad.
[schedule-propagators]: New function.
-rw-r--r--ravanan/command-line-tool.scm66
-rw-r--r--ravanan/propnet.scm24
-rw-r--r--ravanan/workflow.scm47
3 files changed, 73 insertions, 64 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 7da91e3..a5b800f 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -389,7 +389,8 @@ path."
                                 scratch store batch-system
                                 #:key guix-daemon-socket)
   "Run @code{CommandLineTool} class workflow @var{cwl} named @var{name} with
-@var{inputs} using tools from Guix manifest in @var{manifest-file}.
+@var{inputs} using tools from Guix manifest in @var{manifest-file}. Return a
+state-monadic job state object.
 
 @var{channels}, @var{scratch}, @var{store}, @var{batch-system} and
 @var{guix-daemon-socket} are the same as in @code{run-workflow} from
@@ -422,11 +423,12 @@ path."
     (if (file-exists? store-data-file)
         ;; Return a dummy success state object if script has already
         ;; been run successfully.
-        (begin
-          (format (current-error-port)
-                  "~a previously run; retrieving result from store~%"
-                  script)
-          (single-machine-job-state script #t))
+        (state-return
+         (begin
+           (format (current-error-port)
+                   "~a previously run; retrieving result from store~%"
+                   script)
+           (single-machine-job-state script #t)))
         ;; Run script if it has not already been run.
         (begin
           ;; Delete output files directory if an incomplete one exists
@@ -440,39 +442,35 @@ path."
           (mkdir store-files-directory)
           (cond
            ((eq? batch-system 'single-machine)
-            (single-machine-job-state script
-                                      (run-with-state
-                                       (single-machine:submit-job
-                                        `(("WORKFLOW_OUTPUT_DIRECTORY" .
-                                           ,store-files-directory)
-                                          ("WORKFLOW_OUTPUT_DATA_FILE" .
-                                           ,store-data-file))
-                                        stdout-file
-                                        stderr-file
-                                        script))))
+            (state-let* ((success? (single-machine:submit-job
+                                    `(("WORKFLOW_OUTPUT_DIRECTORY" .
+                                       ,store-files-directory)
+                                      ("WORKFLOW_OUTPUT_DATA_FILE" .
+                                       ,store-data-file))
+                                    stdout-file
+                                    stderr-file
+                                    script)))
+              (state-return (single-machine-job-state script success?))))
            ((slurm-api-batch-system? batch-system)
-            (format (current-error-port)
-                    "Submitting job ~a~%"
-                    script)
-            (let ((job-id (run-with-state
-                           (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
-                                                ,store-files-directory)
-                                               ("WORKFLOW_OUTPUT_DATA_FILE" .
-                                                ,store-data-file))
-                                             stdout-file
-                                             stderr-file
-                                             cpus
-                                             name
-                                             script
-                                             #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
-                                             #:jwt (slurm-api-batch-system-jwt batch-system)
-                                             #:partition (slurm-api-batch-system-partition batch-system)
-                                             #:nice (slurm-api-batch-system-nice batch-system)))))
+            (state-let* ((job-id
+                          (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
+                                               ,store-files-directory)
+                                              ("WORKFLOW_OUTPUT_DATA_FILE" .
+                                               ,store-data-file))
+                                            stdout-file
+                                            stderr-file
+                                            cpus
+                                            name
+                                            script
+                                            #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+                                            #:jwt (slurm-api-batch-system-jwt batch-system)
+                                            #:partition (slurm-api-batch-system-partition batch-system)
+                                            #:nice (slurm-api-batch-system-nice batch-system))))
               (format (current-error-port)
                       "~a submitted as job ID ~a~%"
                       script
                       job-id)
-              (slurm-job-state script job-id)))
+              (state-return (slurm-job-state script job-id))))
            (else
             (assertion-violation batch-system "Invalid batch system")))))))
 
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index dfe61ac..da74dec 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -161,6 +161,21 @@ add to the inbox."
                          (maybe-assoc-ref (just cells) cell-name))))
                 (propagator-inputs propagator))))
 
+  (define (schedule-propagators propagators cells)
+    "Schedule all propagators among @var{propagators} whose inputs are present in
+@var{cells}. Return an association list mapping scheduled propagator names to
+their states."
+    (append-map (lambda (propagator)
+                  (maybe-alist
+                   (cons (propagator-name propagator)
+                         (maybe-let* ((propagator-state
+                                       (activate-propagator
+                                        scheduler
+                                        propagator
+                                        (propagator-input-values cells propagator))))
+                           (just (run-with-state propagator-state))))))
+                propagators))
+
   ;; We implement propagator networks as a state machine. The state consists of
   ;; the current values of all the cells and the list of all propagators
   ;; currently in flight. Each iteration of loop represents one state
@@ -269,14 +284,7 @@ add to the inbox."
                 ;; application of propnets, this will never result in the same
                 ;; step being recomputed; so this approach does not come at a
                 ;; higher computational cost.
-                (append (append-map (lambda (propagator)
-                                      (maybe-alist
-                                       (cons (propagator-name propagator)
-                                             (activate-propagator
-                                              scheduler
-                                              propagator
-                                              (propagator-input-values cells propagator)))))
-                                    propagators-inbox)
+                (append (schedule-propagators propagators-inbox cells)
                         propagators-in-flight))))))))
 
 (define (capture-propnet-output state)
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 09d733c..1ef9284 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -1,5 +1,5 @@
 ;;; ravanan --- High-reproducibility CWL runner powered by Guix
-;;; Copyright © 2024 Arun Isaac <arunisaac@systemreboot.net>
+;;; Copyright © 2024, 2025 Arun Isaac <arunisaac@systemreboot.net>
 ;;;
 ;;; This file is part of ravanan.
 ;;;
@@ -288,26 +288,28 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a
                                          store)))
             (cond
              ((string=? class "CommandLineTool")
-              (command-line-tool-state
-               (run-command-line-tool name
-                                      manifest-file
-                                      channels
-                                      cwl
-                                      inputs
-                                      scratch
-                                      store
-                                      batch-system
-                                      #:guix-daemon-socket guix-daemon-socket)
-               (assoc-ref* cwl "outputs")))
+              (state-let* ((job-state
+                            (run-command-line-tool name
+                                                   manifest-file
+                                                   channels
+                                                   cwl
+                                                   inputs
+                                                   scratch
+                                                   store
+                                                   batch-system
+                                                   #:guix-daemon-socket guix-daemon-socket)))
+                (state-return (command-line-tool-state job-state
+                                                       (assoc-ref* cwl "outputs")))))
              ((string=? class "ExpressionTool")
               (error "Workflow class not implemented yet" class))
              ((string=? class "Workflow")
-              (workflow-state (schedule-propnet (workflow-class->propnet name
-                                                                         cwl
-                                                                         scheduler
-                                                                         batch-system)
-                                                inputs)
-                              (assoc-ref* cwl "outputs"))))))))
+              (state-return
+               (workflow-state (schedule-propnet (workflow-class->propnet name
+                                                                          cwl
+                                                                          scheduler
+                                                                          batch-system)
+                                                 inputs)
+                               (assoc-ref* cwl "outputs")))))))))
 
   (define (poll state)
     "Return current status and updated state of job @var{state} object. The status is
@@ -610,10 +612,11 @@ area need not be shared. @var{store} is the path to the shared ravanan store.
   (let ((scheduler (workflow-scheduler
                     manifest-file channels scratch store batch-system
                     #:guix-daemon-socket guix-daemon-socket)))
-    (let loop ((state ((scheduler-schedule scheduler)
-                       (scheduler-proc name cwl %nothing %nothing)
-                       inputs
-                       scheduler)))
+    (let loop ((state (run-with-state
+                       ((scheduler-schedule scheduler)
+                        (scheduler-proc name cwl %nothing %nothing)
+                        inputs
+                        scheduler))))
       ;; Poll.
       (let ((status state ((scheduler-poll scheduler) state)))
         (if (eq? status 'pending)