aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ravanan/command-line-tool.scm118
-rw-r--r--ravanan/workflow.scm112
2 files changed, 114 insertions, 116 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 9878c95..3c4eab5 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -18,8 +18,6 @@
(define-module (ravanan command-line-tool)
#:use-module ((rnrs base) #:select (assertion-violation error))
- #:use-module ((rnrs conditions) #:select (define-condition-type))
- #:use-module (rnrs exceptions)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-26)
@@ -47,7 +45,6 @@
#:use-module (yaml)
#:use-module (ravanan glob)
#:use-module (ravanan job-state)
- #:use-module (ravanan propnet)
#:use-module (ravanan reader)
#:use-module (ravanan slurm-api)
#:use-module (ravanan work command-line-tool)
@@ -57,15 +54,12 @@
#:use-module (ravanan work utils)
#:use-module (ravanan work vectors)
#:export (run-command-line-tool
- command-line-tool-scheduler
check-requirements
inherit-requirements
%command-line-tool-supported-requirements
- scheduler-proc
- scheduler-proc-name
- scheduler-proc-cwl
- scheduler-proc-scatter
- scheduler-proc-scatter-method))
+ script->store-stdout-file
+ script->store-stderr-file
+ capture-command-line-tool-output))
(define %store-files-directory
"files")
@@ -85,18 +79,6 @@
(define %worker-node
(file-append node "/bin/node"))
-(define-immutable-record-type <scheduler-proc>
- (scheduler-proc name cwl scatter scatter-method)
- scheduler-proc?
- (name scheduler-proc-name)
- (cwl scheduler-proc-cwl)
- (scatter scheduler-proc-scatter)
- (scatter-method scheduler-proc-scatter-method))
-
-(define-condition-type &job-failure &error
- job-failure job-failure?
- (script job-failure-script))
-
(define-immutable-record-type <formal-output>
(formal-output id type binding)
formal-output?
@@ -1071,97 +1053,3 @@ directory of the workflow."
#$scratch)))
#$scratch)))))
guix-daemon-socket))
-
-(define* (command-line-tool-scheduler manifest scratch store batch-system
- #:key guix-daemon-socket
- slurm-api-endpoint slurm-jwt)
- (define (schedule proc inputs)
- "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
-job state object."
- (let ((name (scheduler-proc-name proc))
- (cwl (scheduler-proc-cwl proc))
- (scatter (from-maybe (scheduler-proc-scatter proc)
- #f))
- (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
- #f)))
- (if scatter
- (case scatter-method
- ((dot-product)
- (apply vector-map
- (lambda input-elements
- ;; Recurse with scattered inputs spliced in.
- (schedule (scheduler-proc (scheduler-proc-name proc)
- (scheduler-proc-cwl proc)
- %nothing
- %nothing)
- ;; Replace scattered inputs with single
- ;; elements.
- (apply assoc-set
- inputs
- (map cons
- (vector->list scatter)
- input-elements))))
- ;; Extract values of scattered inputs.
- (vector-map->list (cut assoc-ref inputs <>)
- scatter)))
- ((nested-cross-product flat-cross-product)
- (error scatter-method
- "Scatter method not implemented yet")))
- (run-command-line-tool name
- manifest
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt))))
-
- (define (poll state)
- "Return current status and updated state of job @var{state} object. The status is
-one of the symbols @code{pending} or @code{completed}. Raise an exception and
-exit if job has failed."
- (guard (c ((job-failure? c)
- (let ((script (job-failure-script c)))
- (user-error
- "~a failed; logs at ~a and ~a~%"
- script
- (script->store-stdout-file script store)
- (script->store-stderr-file script store)))))
- (let ((status state (job-state-status state
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt)))
- (values (case status
- ((failed)
- (raise-exception (job-failure (job-state-script state))))
- (else => identity))
- state))))
-
- (define (capture-output state)
- "Return output of completed job @var{state}."
- (if (vector? state)
- ;; Combine outputs from individual state elements.
- (match (vector-map capture-output state)
- ((and #(head-output _ ...)
- outputs)
- (map (match-lambda
- ((id . value)
- (cons id
- (vector-map (lambda (output)
- ;; FIXME: Is this the correct way to
- ;; handle missing outputs?
- (or (assoc-ref output id)
- 'null))
- outputs))))
- head-output)))
- ;; Log progress and return captured output.
- (let ((script (job-state-script state)))
- (format (current-error-port)
- "~a completed; logs at ~a and ~a~%"
- script
- (script->store-stdout-file script store)
- (script->store-stderr-file script store))
- (capture-command-line-tool-output script store))))
-
- (scheduler schedule poll capture-output))
diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm
index 0249a7e..ff90dca 100644
--- a/ravanan/workflow.scm
+++ b/ravanan/workflow.scm
@@ -17,12 +17,16 @@
;;; along with ravanan. If not, see <https://www.gnu.org/licenses/>.
(define-module (ravanan workflow)
+ #:use-module ((rnrs conditions) #:select (define-condition-type))
+ #:use-module (rnrs exceptions)
#:use-module (srfi srfi-1)
+ #:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-26)
#:use-module (srfi srfi-71)
#:use-module (ice-9 filesystem)
#:use-module (ice-9 match)
#:use-module (ravanan command-line-tool)
+ #:use-module (ravanan job-state)
#:use-module (ravanan propnet)
#:use-module (ravanan reader)
#:use-module (ravanan work command-line-tool)
@@ -42,6 +46,18 @@
(define %job-poll-interval
5)
+(define-condition-type &job-failure &error
+ job-failure job-failure?
+ (script job-failure-script))
+
+(define-immutable-record-type <scheduler-proc>
+ (scheduler-proc name cwl scatter scatter-method)
+ scheduler-proc?
+ (name scheduler-proc-name)
+ (cwl scheduler-proc-cwl)
+ (scatter scheduler-proc-scatter)
+ (scatter-method scheduler-proc-scatter-method))
+
(define (value=? maybe-val1 maybe-val2)
"Return @code{#t} if maybe-monadic values @var{maybe-val1} and
@var{maybe-val2} are the same value. Else, return @code{#f}."
@@ -249,6 +265,100 @@ their own namespaces."
(error "Invalid workflow class" class)))
name cwl)))
+(define* (workflow-scheduler manifest scratch store batch-system
+ #:key guix-daemon-socket
+ slurm-api-endpoint slurm-jwt)
+ (define (schedule proc inputs)
+ "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
+job state object."
+ (let ((name (scheduler-proc-name proc))
+ (cwl (scheduler-proc-cwl proc))
+ (scatter (from-maybe (scheduler-proc-scatter proc)
+ #f))
+ (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
+ #f)))
+ (if scatter
+ (case scatter-method
+ ((dot-product)
+ (apply vector-map
+ (lambda input-elements
+ ;; Recurse with scattered inputs spliced in.
+ (schedule (scheduler-proc (scheduler-proc-name proc)
+ (scheduler-proc-cwl proc)
+ %nothing
+ %nothing)
+ ;; Replace scattered inputs with single
+ ;; elements.
+ (apply assoc-set
+ inputs
+ (map cons
+ (vector->list scatter)
+ input-elements))))
+ ;; Extract values of scattered inputs.
+ (vector-map->list (cut assoc-ref inputs <>)
+ scatter)))
+ ((nested-cross-product flat-cross-product)
+ (error scatter-method
+ "Scatter method not implemented yet")))
+ (run-command-line-tool name
+ manifest
+ cwl
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt))))
+
+ (define (poll state)
+ "Return current status and updated state of job @var{state} object. The status is
+one of the symbols @code{pending} or @code{completed}. Raise an exception and
+exit if job has failed."
+ (guard (c ((job-failure? c)
+ (let ((script (job-failure-script c)))
+ (user-error
+ "~a failed; logs at ~a and ~a~%"
+ script
+ (script->store-stdout-file script store)
+ (script->store-stderr-file script store)))))
+ (let ((status state (job-state-status state
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt)))
+ (values (case status
+ ((failed)
+ (raise-exception (job-failure (job-state-script state))))
+ (else => identity))
+ state))))
+
+ (define (capture-output state)
+ "Return output of completed job @var{state}."
+ (if (vector? state)
+ ;; Combine outputs from individual state elements.
+ (match (vector-map capture-output state)
+ ((and #(head-output _ ...)
+ outputs)
+ (map (match-lambda
+ ((id . value)
+ (cons id
+ (vector-map (lambda (output)
+ ;; FIXME: Is this the correct way to
+ ;; handle missing outputs?
+ (or (assoc-ref output id)
+ 'null))
+ outputs))))
+ head-output)))
+ ;; Log progress and return captured output.
+ (let ((script (job-state-script state)))
+ (format (current-error-port)
+ "~a completed; logs at ~a and ~a~%"
+ script
+ (script->store-stdout-file script store)
+ (script->store-stderr-file script store))
+ (capture-command-line-tool-output script store))))
+
+ (scheduler schedule poll capture-output))
+
(define* (run-workflow name manifest cwl inputs
scratch store batch-system
#:key guix-daemon-socket
@@ -304,7 +414,7 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and
(propnet (workflow->propagators name cwl)
value=?
merge-values
- (command-line-tool-scheduler
+ (workflow-scheduler
manifest scratch store batch-system
#:guix-daemon-socket guix-daemon-socket
#:slurm-api-endpoint slurm-api-endpoint