From 7d6d32fdf833f9e417725e02e36a3359271f5c3a Mon Sep 17 00:00:00 2001
From: Arun Isaac
Date: Tue, 24 Sep 2024 00:55:27 +0100
Subject: propnet: Allow propagators to update their state.

This will come in handy later when we implement compound
propagators (propagators that are themselves a propagator network).

* ravanan/command-line-tool.scm (command-line-tool-scheduler)[poll]:
Return two values---the status and the updated state.
* ravanan/job-state.scm (job-state-status): Return two values---the
status and the updated state.
* ravanan/propnet.scm (partition-map): New function.
(poll-propnet): Update state of a propagator in flight.
---
 ravanan/command-line-tool.scm | 20 +++++++++--------
 ravanan/job-state.scm         | 51 ++++++++++++++++++++++---------------------
 ravanan/propnet.scm           | 25 ++++++++++++++++-----
 3 files changed, 56 insertions(+), 40 deletions(-)

diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index c898430..9878c95 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -1119,9 +1119,9 @@ job state object."
                                  #:slurm-jwt slurm-jwt))))
 
   (define (poll state)
-    "Return current status of job @var{state} object---one of the symbols
-@code{pending} or @code{completed}. Raise an exception and exit if job has
-failed."
+    "Return current status and updated state of job @var{state} object. The status is
+one of the symbols @code{pending} or @code{completed}. Raise an exception and
+exit if job has failed."
     (guard (c ((job-failure? c)
                (let ((script (job-failure-script c)))
                  (user-error
@@ -1129,12 +1129,14 @@ failed."
                   script
                   (script->store-stdout-file script store)
                   (script->store-stderr-file script store)))))
-      (case (job-state-status state
-                              #:slurm-api-endpoint slurm-api-endpoint
-                              #:slurm-jwt slurm-jwt)
-        ((failed)
-         (raise-exception (job-failure (job-state-script state))))
-        (else => identity))))
+      (let ((status state (job-state-status state
+                                            #:slurm-api-endpoint slurm-api-endpoint
+                                            #:slurm-jwt slurm-jwt)))
+        (values (case status
+                  ((failed)
+                   (raise-exception (job-failure (job-state-script state))))
+                  (else => identity))
+                state))))
 
   (define (capture-output state)
     "Return output of completed job @var{state}."
diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm
index 45ccb37..4ced645 100644
--- a/ravanan/job-state.scm
+++ b/ravanan/job-state.scm
@@ -55,31 +55,32 @@
    state))
 
 (define* (job-state-status state #:key slurm-api-endpoint slurm-jwt)
-  "Return current status of job with @var{state}---one of the symbols
-@code{completed}, @code{failed} or @code{pending}.
+  "Return current status and updated state of job with @var{state}. The status is
+one of the symbols @code{completed}, @code{failed} or @code{pending}.
 
 @var{slurm-api-endpoint} and @var{slurm-jwt} are the same as in
 @code{run-workflow} from @code{(ravanan workflow)}."
-  (cond
-   ;; Single machine jobs are run synchronously. So, they return success or
-   ;; failure immediately.
-   ((single-machine-job-state? state)
-    (if (single-machine-job-state-success? state)
-        'completed
-        'failed))
-   ;; Poll slurm for job state.
-   ((slurm-job-state? state)
-    (job-state (slurm-job-state-job-id state)
-               #:api-endpoint slurm-api-endpoint
-               #:jwt slurm-jwt))
-   ;; For vector states, poll each state element and return 'completed only if
-   ;; all state elements have completed.
-   ((vector? state)
-    (or (vector-every (lambda (state-element)
-                        (case (job-state-status state-element
-                                                #:slurm-api-endpoint slurm-api-endpoint
-                                                #:slurm-jwt slurm-jwt)
-                          ((completed) => identity)
-                          (else #f)))
-                      state)
-        'pending))))
+  (values (cond
+           ;; Single machine jobs are run synchronously. So, they return success
+           ;; or failure immediately.
+           ((single-machine-job-state? state)
+            (if (single-machine-job-state-success? state)
+                'completed
+                'failed))
+           ;; Poll slurm for job state.
+           ((slurm-job-state? state)
+            (job-state (slurm-job-state-job-id state)
+                       #:api-endpoint slurm-api-endpoint
+                       #:jwt slurm-jwt))
+           ;; For vector states, poll each state element and return 'completed
+           ;; only if all state elements have completed.
+           ((vector? state)
+            (or (vector-every (lambda (state-element)
+                                (case (job-state-status state-element
+                                                        #:slurm-api-endpoint slurm-api-endpoint
+                                                        #:slurm-jwt slurm-jwt)
+                                  ((completed) => identity)
+                                  (else #f)))
+                              state)
+                'pending)))
+          state))
diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm
index 04ea31e..6110d93 100644
--- a/ravanan/propnet.scm
+++ b/ravanan/propnet.scm
@@ -77,6 +77,13 @@
   (propagators-in-flight propnet-state-propagators-in-flight)
   (propagators-inbox propnet-state-propagators-inbox))
 
+(define (partition-map pred proc lst)
+  "Partition @var{lst} into two lists using @var{pred} like @code{partition}. Then,
+map @var{proc} over both the lists and return the resulting lists."
+  (let ((true-list false-list (partition pred lst)))
+    (values (map proc true-list)
+            (map proc false-list))))
+
 (define (activate-propagator schedule propagator inputs-alist)
   "Activate @var{propagator} with inputs from @var{inputs-alist}. If some
 required inputs are absent, do nothing. Schedule the propagator using
@@ -230,12 +237,18 @@ add to the inbox."
             (_
              (let ((finished-propagators
                     propagators-still-in-flight
-                    (partition (match-lambda
-                                 ((name . state)
-                                  (eq? ((scheduler-poll scheduler)
-                                        state)
-                                       'completed)))
-                               propagators-in-flight)))
+                    (partition-map (match-lambda
+                                     ((_ _ 'completed) #t)
+                                     (_ #f))
+                                   (match-lambda
+                                     ((name state _)
+                                      (cons name state)))
+                                   (map (match-lambda
+                                          ((name . state)
+                                           (let ((status state ((scheduler-poll scheduler)
+                                                                state)))
+                                             (list name state status))))
+                                        propagators-in-flight))))
                (match finished-propagators
                  ;; None of the propagators we checked have completed. Return a
                  ;; pending state.
-- 
cgit v1.2.3