From 7d6d32fdf833f9e417725e02e36a3359271f5c3a Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Tue, 24 Sep 2024 00:55:27 +0100 Subject: propnet: Allow propagators to update their state. This will come in handy later when we implement compound propagators (propagators that are themselves a propagator network). * ravanan/command-line-tool.scm (command-line-tool-scheduler)[poll]: Return two values---the status and the updated state. * ravanan/job-state.scm (job-state-status): Return two values---the status and the updated state. * ravanan/propnet.scm (partition-map): New function. (poll-propnet): Update state of a propagator in flight. --- ravanan/command-line-tool.scm | 20 +++++++++-------- ravanan/job-state.scm | 51 ++++++++++++++++++++++--------------------- ravanan/propnet.scm | 25 ++++++++++++++++----- 3 files changed, 56 insertions(+), 40 deletions(-) diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm index c898430..9878c95 100644 --- a/ravanan/command-line-tool.scm +++ b/ravanan/command-line-tool.scm @@ -1119,9 +1119,9 @@ job state object." #:slurm-jwt slurm-jwt)))) (define (poll state) - "Return current status of job @var{state} object---one of the symbols -@code{pending} or @code{completed}. Raise an exception and exit if job has -failed." + "Return current status and updated state of job @var{state} object. The status is +one of the symbols @code{pending} or @code{completed}. Raise an exception and +exit if job has failed." (guard (c ((job-failure? c) (let ((script (job-failure-script c))) (user-error @@ -1129,12 +1129,14 @@ failed." script (script->store-stdout-file script store) (script->store-stderr-file script store))))) - (case (job-state-status state - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt) - ((failed) - (raise-exception (job-failure (job-state-script state)))) - (else => identity)))) + (let ((status state (job-state-status state + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (values (case status + ((failed) + (raise-exception (job-failure (job-state-script state)))) + (else => identity)) + state)))) (define (capture-output state) "Return output of completed job @var{state}." diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm index 45ccb37..4ced645 100644 --- a/ravanan/job-state.scm +++ b/ravanan/job-state.scm @@ -55,31 +55,32 @@ state)) (define* (job-state-status state #:key slurm-api-endpoint slurm-jwt) - "Return current status of job with @var{state}---one of the symbols -@code{completed}, @code{failed} or @code{pending}. + "Return current status and updated state of job with @var{state}. The status is +one of the symbols @code{completed}, @code{failed} or @code{pending}. @var{slurm-api-endpoint} and @var{slurm-jwt} are the same as in @code{run-workflow} from @code{(ravanan workflow)}." - (cond - ;; Single machine jobs are run synchronously. So, they return success or - ;; failure immediately. - ((single-machine-job-state? state) - (if (single-machine-job-state-success? state) - 'completed - 'failed)) - ;; Poll slurm for job state. - ((slurm-job-state? state) - (job-state (slurm-job-state-job-id state) - #:api-endpoint slurm-api-endpoint - #:jwt slurm-jwt)) - ;; For vector states, poll each state element and return 'completed only if - ;; all state elements have completed. - ((vector? state) - (or (vector-every (lambda (state-element) - (case (job-state-status state-element - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt) - ((completed) => identity) - (else #f))) - state) - 'pending)))) + (values (cond + ;; Single machine jobs are run synchronously. So, they return success + ;; or failure immediately. + ((single-machine-job-state? state) + (if (single-machine-job-state-success? state) + 'completed + 'failed)) + ;; Poll slurm for job state. + ((slurm-job-state? state) + (job-state (slurm-job-state-job-id state) + #:api-endpoint slurm-api-endpoint + #:jwt slurm-jwt)) + ;; For vector states, poll each state element and return 'completed + ;; only if all state elements have completed. + ((vector? state) + (or (vector-every (lambda (state-element) + (case (job-state-status state-element + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt) + ((completed) => identity) + (else #f))) + state) + 'pending))) + state)) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index 04ea31e..6110d93 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -77,6 +77,13 @@ (propagators-in-flight propnet-state-propagators-in-flight) (propagators-inbox propnet-state-propagators-inbox)) +(define (partition-map pred proc lst) + "Partition @var{lst} into two lists using @var{pred} like @code{partition}. Then, +map @var{proc} over both the lists and return the resulting lists." + (let ((true-list false-list (partition pred lst))) + (values (map proc true-list) + (map proc false-list)))) + (define (activate-propagator schedule propagator inputs-alist) "Activate @var{propagator} with inputs from @var{inputs-alist}. If some required inputs are absent, do nothing. Schedule the propagator using @@ -230,12 +237,18 @@ add to the inbox." (_ (let ((finished-propagators propagators-still-in-flight - (partition (match-lambda - ((name . state) - (eq? ((scheduler-poll scheduler) - state) - 'completed))) - propagators-in-flight))) + (partition-map (match-lambda + ((_ _ 'completed) #t) + (_ #f)) + (match-lambda + ((name state _) + (cons name state))) + (map (match-lambda + ((name . state) + (let ((status state ((scheduler-poll scheduler) + state))) + (list name state status)))) + propagators-in-flight)))) (match finished-propagators ;; None of the propagators we checked have completed. Return a ;; pending state. -- cgit v1.2.3