summary refs log tree commit diff
diff options
context:
space:
mode:
authorArun Isaac2024-09-24 01:10:35 +0100
committerArun Isaac2024-10-01 01:12:50 +0100
commit7b9ac45eecebb454a166990987d80998df1643ca (patch)
tree59daed7ad3bfecf94944e32d2d3e7f2f9df9dc97
parent7d6d32fdf833f9e417725e02e36a3359271f5c3a (diff)
downloadravanan-7b9ac45eecebb454a166990987d80998df1643ca.tar.gz
ravanan-7b9ac45eecebb454a166990987d80998df1643ca.tar.lz
ravanan-7b9ac45eecebb454a166990987d80998df1643ca.zip
command-line-tool: Move scheduler to (ravanan workflow).
* ravanan/command-line-tool.scm: Do not import (rnrs
conditions), (rnrs exceptions) and (ravanan propnet).
(command-line-tool-scheduler): Move to (ravanan workflow) as
workflow-scheduler.
(&job-failure, <scheduler-proc>): Move to (ravanan workflow).
(script->store-stdout-file, script->store-stderr-file,
capture-command-line-tool-output): Export functions.
* ravanan/workflow.scm: Import define-condition-type from (rnrs
conditions), (rnrs exceptions), (srfi srfi-9 gnu) and (ravanan
job-state).
(run-workflow): Call workflow-scheduler instead of
command-line-tool-scheduler.
-rw-r--r--ravanan/command-line-tool.scm118
-rw-r--r--ravanan/workflow.scm112
2 files changed, 114 insertions, 116 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 9878c95..3c4eab5 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -18,8 +18,6 @@
 
 (define-module (ravanan command-line-tool)
   #:use-module ((rnrs base) #:select (assertion-violation error))
-  #:use-module ((rnrs conditions) #:select (define-condition-type))
-  #:use-module (rnrs exceptions)
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-9 gnu)
   #:use-module (srfi srfi-26)
@@ -47,7 +45,6 @@
   #:use-module (yaml)
   #:use-module (ravanan glob)
   #:use-module (ravanan job-state)
-  #:use-module (ravanan propnet)
   #:use-module (ravanan reader)
   #:use-module (ravanan slurm-api)
   #:use-module (ravanan work command-line-tool)
@@ -57,15 +54,12 @@
   #:use-module (ravanan work utils)
   #:use-module (ravanan work vectors)
   #:export (run-command-line-tool
-            command-line-tool-scheduler
             check-requirements
             inherit-requirements
             %command-line-tool-supported-requirements
-            scheduler-proc
-            scheduler-proc-name
-            scheduler-proc-cwl
-            scheduler-proc-scatter
-            scheduler-proc-scatter-method))
+            script->store-stdout-file
+            script->store-stderr-file
+            capture-command-line-tool-output))
 
 (define %store-files-directory
   "files")
@@ -85,18 +79,6 @@
 (define %worker-node
   (file-append node "/bin/node"))
 
-(define-immutable-record-type <scheduler-proc>
-  (scheduler-proc name cwl scatter scatter-method)
-  scheduler-proc?
-  (name scheduler-proc-name)
-  (cwl scheduler-proc-cwl)
-  (scatter scheduler-proc-scatter)
-  (scatter-method scheduler-proc-scatter-method))
-
-(define-condition-type &job-failure &error
-  job-failure job-failure?
-  (script job-failure-script))
-
 (define-immutable-record-type <formal-output>
   (formal-output id type binding)
   formal-output?
@@ -1071,97 +1053,3 @@ directory of the workflow."
                     #$scratch)))
                #$scratch)))))
     guix-daemon-socket))
-
-(define* (command-line-tool-scheduler manifest scratch store batch-system
-                                      #:key guix-daemon-socket
-                                      slurm-api-endpoint slurm-jwt)
-  (define (schedule proc inputs)
-    "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object."
-    (let ((name (scheduler-proc-name proc))
-          (cwl (scheduler-proc-cwl proc))
-          (scatter (from-maybe (scheduler-proc-scatter proc)
-                               #f))
-          (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
-                                      #f)))
-      (if scatter
-          (case scatter-method
-            ((dot-product)
-             (apply vector-map
-                    (lambda input-elements
-                      ;; Recurse with scattered inputs spliced in.
-                      (schedule (scheduler-proc (scheduler-proc-name proc)
-                                                (scheduler-proc-cwl proc)
-                                                %nothing
-                                                %nothing)
-                                ;; Replace scattered inputs with single
-                                ;; elements.
-                                (apply assoc-set
-                                       inputs
-                                       (map cons
-                                            (vector->list scatter)
-                                            input-elements))))
-                    ;; Extract values of scattered inputs.
-                    (vector-map->list (cut assoc-ref inputs <>)
-                                      scatter)))
-            ((nested-cross-product flat-cross-product)
-             (error scatter-method
-                    "Scatter method not implemented yet")))
-          (run-command-line-tool name
-                                 manifest
-                                 cwl
-                                 inputs
-                                 scratch
-                                 store
-                                 batch-system
-                                 #:guix-daemon-socket guix-daemon-socket
-                                 #:slurm-api-endpoint slurm-api-endpoint
-                                 #:slurm-jwt slurm-jwt))))
-
-  (define (poll state)
-    "Return current status and updated state of job @var{state} object. The status is
-one of the symbols @code{pending} or @code{completed}. Raise an exception and
-exit if job has failed."
-    (guard (c ((job-failure? c)
-               (let ((script (job-failure-script c)))
-                 (user-error
-                  "~a failed; logs at ~a and ~a~%"
-                  script
-                  (script->store-stdout-file script store)
-                  (script->store-stderr-file script store)))))
-      (let ((status state (job-state-status state
-                                            #:slurm-api-endpoint slurm-api-endpoint
-                                            #:slurm-jwt slurm-jwt)))
-        (values (case status
-                  ((failed)
-                   (raise-exception (job-failure (job-state-script state))))
-                  (else => identity))
-                state))))
-
-  (define (capture-output state)
-    "Return output of completed job @var{state}."
-    (if (vector? state)
-        ;; Combine outputs from individual state elements.
-        (match (vector-map capture-output state)
-          ((and #(head-output _ ...)
-                outputs)
-           (map (match-lambda
-                  ((id . value)
-                   (cons id
-                         (vector-map (lambda (output)
-                                       ;; FIXME: Is this the correct way to
-                                       ;; handle missing outputs?
-                                       (or (assoc-ref output id)
-                                           'null))
-                                     outputs))))
-                head-output)))
-        ;; Log progress and return captured output.
-        (let ((script (job-state-script state)))
-          (format (current-error-port)
-                  "~a completed; logs at ~a and ~a~%"
-                  script
-                  (script->store-stdout-file script store)
-                  (script->store-stderr-file script store))
-          (capture-command-line-tool-output script store))))
-
-  (scheduler schedule poll capture-output))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 0249a7e..ff90dca 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -17,12 +17,16 @@
 ;;; along with ravanan.  If not, see <https://www.gnu.org/licenses/>.
 
 (define-module (ravanan workflow)
+  #:use-module ((rnrs conditions) #:select (define-condition-type))
+  #:use-module (rnrs exceptions)
   #:use-module (srfi srfi-1)
+  #:use-module (srfi srfi-9 gnu)
   #:use-module (srfi srfi-26)
   #:use-module (srfi srfi-71)
   #:use-module (ice-9 filesystem)
   #:use-module (ice-9 match)
   #:use-module (ravanan command-line-tool)
+  #:use-module (ravanan job-state)
   #:use-module (ravanan propnet)
   #:use-module (ravanan reader)
   #:use-module (ravanan work command-line-tool)
@@ -42,6 +46,18 @@
 (define %job-poll-interval
   5)
 
+(define-condition-type &job-failure &error
+  job-failure job-failure?
+  (script job-failure-script))
+
+(define-immutable-record-type <scheduler-proc>
+  (scheduler-proc name cwl scatter scatter-method)
+  scheduler-proc?
+  (name scheduler-proc-name)
+  (cwl scheduler-proc-cwl)
+  (scatter scheduler-proc-scatter)
+  (scatter-method scheduler-proc-scatter-method))
+
 (define (value=? maybe-val1 maybe-val2)
   "Return @code{#t} if maybe-monadic values @var{maybe-val1} and
 @var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -249,6 +265,100 @@ their own namespaces."
        (error "Invalid workflow class" class)))
      name cwl)))
 
+(define* (workflow-scheduler manifest scratch store batch-system
+                             #:key guix-daemon-socket
+                             slurm-api-endpoint slurm-jwt)
+  (define (schedule proc inputs)
+    "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
+job state object."
+    (let ((name (scheduler-proc-name proc))
+          (cwl (scheduler-proc-cwl proc))
+          (scatter (from-maybe (scheduler-proc-scatter proc)
+                               #f))
+          (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
+                                      #f)))
+      (if scatter
+          (case scatter-method
+            ((dot-product)
+             (apply vector-map
+                    (lambda input-elements
+                      ;; Recurse with scattered inputs spliced in.
+                      (schedule (scheduler-proc (scheduler-proc-name proc)
+                                                (scheduler-proc-cwl proc)
+                                                %nothing
+                                                %nothing)
+                                ;; Replace scattered inputs with single
+                                ;; elements.
+                                (apply assoc-set
+                                       inputs
+                                       (map cons
+                                            (vector->list scatter)
+                                            input-elements))))
+                    ;; Extract values of scattered inputs.
+                    (vector-map->list (cut assoc-ref inputs <>)
+                                      scatter)))
+            ((nested-cross-product flat-cross-product)
+             (error scatter-method
+                    "Scatter method not implemented yet")))
+          (run-command-line-tool name
+                                 manifest
+                                 cwl
+                                 inputs
+                                 scratch
+                                 store
+                                 batch-system
+                                 #:guix-daemon-socket guix-daemon-socket
+                                 #:slurm-api-endpoint slurm-api-endpoint
+                                 #:slurm-jwt slurm-jwt))))
+
+  (define (poll state)
+    "Return current status and updated state of job @var{state} object. The status is
+one of the symbols @code{pending} or @code{completed}. Raise an exception and
+exit if job has failed."
+    (guard (c ((job-failure? c)
+               (let ((script (job-failure-script c)))
+                 (user-error
+                  "~a failed; logs at ~a and ~a~%"
+                  script
+                  (script->store-stdout-file script store)
+                  (script->store-stderr-file script store)))))
+      (let ((status state (job-state-status state
+                                            #:slurm-api-endpoint slurm-api-endpoint
+                                            #:slurm-jwt slurm-jwt)))
+        (values (case status
+                  ((failed)
+                   (raise-exception (job-failure (job-state-script state))))
+                  (else => identity))
+                state))))
+
+  (define (capture-output state)
+    "Return output of completed job @var{state}."
+    (if (vector? state)
+        ;; Combine outputs from individual state elements.
+        (match (vector-map capture-output state)
+          ((and #(head-output _ ...)
+                outputs)
+           (map (match-lambda
+                  ((id . value)
+                   (cons id
+                         (vector-map (lambda (output)
+                                       ;; FIXME: Is this the correct way to
+                                       ;; handle missing outputs?
+                                       (or (assoc-ref output id)
+                                           'null))
+                                     outputs))))
+                head-output)))
+        ;; Log progress and return captured output.
+        (let ((script (job-state-script state)))
+          (format (current-error-port)
+                  "~a completed; logs at ~a and ~a~%"
+                  script
+                  (script->store-stdout-file script store)
+                  (script->store-stderr-file script store))
+          (capture-command-line-tool-output script store))))
+
+  (scheduler schedule poll capture-output))
+
 (define* (run-workflow name manifest cwl inputs
                        scratch store batch-system
                        #:key guix-daemon-socket
@@ -304,7 +414,7 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
                      (propnet (workflow->propagators name cwl)
                               value=?
                               merge-values
-                              (command-line-tool-scheduler
+                              (workflow-scheduler
                                manifest scratch store batch-system
                                #:guix-daemon-socket guix-daemon-socket
                                #:slurm-api-endpoint slurm-api-endpoint