aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ravanan/command-line-tool.scm96
1 files changed, 74 insertions, 22 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index 5bb9dac..9541207 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -947,17 +947,44 @@ named @var{name} with @var{inputs} using tools from Guix manifest
"Schedule @var{proc} with inputs from the @var{inputs} association list. Return a
job state object."
(let ((name (scheduler-proc-name proc))
- (cwl (scheduler-proc-cwl proc)))
- (run-command-line-tool name
- manifest
- cwl
- inputs
- scratch
- store
- batch-system
- #:guix-daemon-socket guix-daemon-socket
- #:slurm-api-endpoint slurm-api-endpoint
- #:slurm-jwt slurm-jwt)))
+ (cwl (scheduler-proc-cwl proc))
+ (scatter (from-maybe (scheduler-proc-scatter proc)
+ #f))
+ (scatter-method (from-maybe (scheduler-proc-scatter-method proc)
+ #f)))
+ (if scatter
+ (case scatter-method
+ ((dot-product)
+ (apply vector-map
+ (lambda input-elements
+ ;; Recurse with scattered inputs spliced in.
+ (schedule (scheduler-proc (scheduler-proc-name proc)
+ (scheduler-proc-cwl proc)
+ %nothing
+ %nothing)
+ ;; Replace scattered inputs with single
+ ;; elements.
+ (apply assoc-set
+ inputs
+ (map cons
+ (vector->list scatter)
+ input-elements))))
+ ;; Extract values of scattered inputs.
+ (vector-map->list (cut assoc-ref inputs <>)
+ scatter)))
+ ((nested-cross-product flat-cross-product)
+ (raise-error scatter-method
+ "Scatter method not implemented yet")))
+ (run-command-line-tool name
+ manifest
+ cwl
+ inputs
+ scratch
+ store
+ batch-system
+ #:guix-daemon-socket guix-daemon-socket
+ #:slurm-api-endpoint slurm-api-endpoint
+ #:slurm-jwt slurm-jwt))))
(define (poll state)
"Return current status of job @var{state} object---one of the symbols
@@ -986,20 +1013,45 @@ failed."
((failed)
(raise-exception
(job-failure (slurm-job-state-script state))))
- (else => identity))))))
+ (else => identity)))
+ ;; For vector states, poll each state element and return 'completed only
+ ;; if all state elements have completed.
+ ((vector? state)
+ (or (vector-every (lambda (state-element)
+ (case (poll state-element)
+ ((completed) => identity)
+ (else #f)))
+ state)
+ 'pending)))))
(define (capture-output state)
"Return output of completed job @var{state}."
- (let ((script ((case batch-system
- ((single-machine) single-machine-job-state-script)
- ((slurm-api) slurm-job-state-script))
- state)))
- (format (current-error-port)
- "~a completed; logs at ~a and ~a~%"
- script
- (script->store-stdout-file script store)
- (script->store-stderr-file script store))
- (capture-command-line-tool-output script store)))
+ (if (vector? state)
+ ;; Combine outputs from individual state elements.
+ (match (vector-map capture-output state)
+ ((and #(head-output _ ...)
+ outputs)
+ (map (match-lambda
+ ((id . value)
+ (cons id
+ (vector-map (lambda (output)
+ ;; FIXME: Is this the correct way to
+ ;; handle missing outputs?
+ (or (assoc-ref output id)
+ 'null))
+ outputs))))
+ head-output)))
+ ;; Log progress and return captured output.
+ (let ((script ((case batch-system
+ ((single-machine) single-machine-job-state-script)
+ ((slurm-api) slurm-job-state-script))
+ state)))
+ (format (current-error-port)
+ "~a completed; logs at ~a and ~a~%"
+ script
+ (script->store-stdout-file script store)
+ (script->store-stderr-file script store))
+ (capture-command-line-tool-output script store))))
(scheduler schedule
poll