From f7ef493180b8413b96b0ff7fe9e45bc8cd83d8d2 Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Mon, 20 Jan 2025 00:27:33 +0000 Subject: propnet: Introduce objects for polling. Let pollers return objects instead of two separate values. objects are essentially named 2-tuples. This will be more convenient than multiple values when dealing with the state monad. * ravanan/propnet.scm (): New record type. (poll-propnet): Accept and return objects. * ravanan/workflow.scm (workflow-scheduler)[poll]: Accept and return objects. (run-workflow): Accept objects from poll. --- ravanan/propnet.scm | 39 ++++++++++++++++++++++++--------------- ravanan/workflow.scm | 51 +++++++++++++++++++++++++++------------------------ 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index da74dec..360aea4 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -40,6 +40,9 @@ scheduler-poll scheduler-capture-output schedule-propnet + state+status + state+status-state + state+status-status poll-propnet capture-propnet-output)) @@ -67,6 +70,12 @@ (poll scheduler-poll) (capture-output scheduler-capture-output)) +(define-immutable-record-type + (state+status state status) + state+status? + (state state+status-state) + (status state+status-status)) + (define-immutable-record-type (propnet-state propnet cells cells-inbox propagators-in-flight propagators-inbox) propnet-state? @@ -127,9 +136,9 @@ exists, return @code{#f}. @var{val} is compared using @code{equal?}." (propnet-propagators propnet))) (define (poll-propnet state) - "Poll propagator network @var{state}. Return two values---a status symbol (either -@code{completed} or @code{pending}) and the current state of the propagator -network." + "Poll propagator network @var{state}. Return a @code{} object +containing two values---the updated state of the propagator network and a status +symbol (either @code{completed} or @code{pending})." (define propnet (propnet-state-propnet state)) @@ -226,12 +235,12 @@ their states." ;; All propagators are finished. The propnet has stabilized. We are ;; done. Return all cell values. (() - (values 'completed - (propnet-state propnet - cells - cell-values-inbox - propagators-in-flight - propagators-inbox))) + (state+status (propnet-state propnet + cells + cell-values-inbox + propagators-in-flight + propagators-inbox) + 'completed)) ;; Propagators are still in flight. Check if any of them have ;; completed. (_ @@ -253,12 +262,12 @@ their states." ;; None of the propagators we checked have completed. Return a ;; pending state. (() - (values 'pending - (propnet-state propnet - cells - cell-values-inbox - propagators-still-in-flight - propagators-inbox))) + (state+status (propnet-state propnet + cells + cell-values-inbox + propagators-still-in-flight + propagators-inbox) + 'pending)) ;; Some of the propagators we checked have completed. Enqueue ;; their outputs in the cells inbox and loop. (_ diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 13deb6f..3615664 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -311,9 +311,9 @@ job state object. @var{proc} may either be a @code{} object or a (assoc-ref* cwl "outputs"))))))))) (define (poll state) - "Return current status and updated state of job @var{state} object. The status is -one of the symbols @code{pending} or @code{completed}. Raise an exception and -exit if job has failed." + "Return updated state and current status of job @var{state} object as a +@code{} object. The status is one of the symbols @code{pending} or +@code{completed}." (guard (c ((job-failure? c) (let ((script (job-failure-script c))) (user-error @@ -326,28 +326,30 @@ exit if job has failed." ;; completed. ((vector? state) (let ((status state (vector-mapn poll state))) - (values (if (vector-every (cut eq? <> 'completed) - status) - 'completed - 'pending) - state))) + (state+status state + (if (vector-every (cut eq? <> 'completed) + status) + 'completed + 'pending)))) ;; Poll job state. Raise an exception if the job has failed. ((command-line-tool-state? state) - (values (case (job-state-status (command-line-tool-state-job-state state) - batch-system) - ((failed) - (raise-exception (job-failure - (job-state-script - (command-line-tool-state-job-state state))))) - (else => identity)) - state)) + (state+status state + (case (job-state-status (command-line-tool-state-job-state state) + batch-system) + ((failed) + (raise-exception (job-failure + (job-state-script + (command-line-tool-state-job-state state))))) + (else => identity)))) ;; Poll sub-workflow state. We do not need to check the status here since ;; job failures only occur at the level of a CommandLineTool. ((workflow-state? state) - (let ((status updated-propnet-state - (poll-propnet (workflow-state-propnet-state state)))) - (values status - (set-workflow-state-propnet-state state updated-propnet-state)))) + (let ((updated-state+status + (poll-propnet (workflow-state-propnet-state state)))) + (state+status + (set-workflow-state-propnet-state state + (state+status-state updated-state+status)) + (state+status-status updated-state+status)))) (else (assertion-violation state "Invalid state"))))) @@ -615,8 +617,8 @@ area need not be shared. @var{store} is the path to the shared ravanan store. inputs scheduler)))) ;; Poll. - (let ((status state ((scheduler-poll scheduler) state))) - (if (eq? status 'pending) + (let ((state+status ((scheduler-poll scheduler) state))) + (if (eq? (state+status-status state+status) 'pending) (begin ;; Pause before looping and polling again so we don't bother the ;; job server too often. @@ -627,6 +629,7 @@ area need not be shared. @var{store} is the path to the shared ravanan store. 0) ((slurm-api-batch-system? batch-system) %job-poll-interval))) - (loop state)) + (loop (state+status-state state+status))) ;; Capture outputs. - ((scheduler-capture-output scheduler) state)))))) + ((scheduler-capture-output scheduler) + (state+status-state state+status))))))) -- cgit v1.2.3