From 7b9ac45eecebb454a166990987d80998df1643ca Mon Sep 17 00:00:00 2001 From: Arun Isaac Date: Tue, 24 Sep 2024 01:10:35 +0100 Subject: command-line-tool: Move scheduler to (ravanan workflow). * ravanan/command-line-tool.scm: Do not import (rnrs conditions), (rnrs exceptions) and (ravanan propnet). (command-line-tool-scheduler): Move to (ravanan workflow) as workflow-scheduler. (&job-failure, ): Move to (ravanan workflow). (script->store-stdout-file, script->store-stderr-file, capture-command-line-tool-output): Export functions. * ravanan/workflow.scm: Import define-condition-type from (rnrs conditions), (rnrs exceptions), (srfi srfi-9 gnu) and (ravanan job-state). (run-workflow): Call workflow-scheduler instead of command-line-tool-scheduler. --- ravanan/command-line-tool.scm | 118 ++---------------------------------------- ravanan/workflow.scm | 112 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 116 deletions(-) diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm index 9878c95..3c4eab5 100644 --- a/ravanan/command-line-tool.scm +++ b/ravanan/command-line-tool.scm @@ -18,8 +18,6 @@ (define-module (ravanan command-line-tool) #:use-module ((rnrs base) #:select (assertion-violation error)) - #:use-module ((rnrs conditions) #:select (define-condition-type)) - #:use-module (rnrs exceptions) #:use-module (srfi srfi-1) #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-26) @@ -47,7 +45,6 @@ #:use-module (yaml) #:use-module (ravanan glob) #:use-module (ravanan job-state) - #:use-module (ravanan propnet) #:use-module (ravanan reader) #:use-module (ravanan slurm-api) #:use-module (ravanan work command-line-tool) @@ -57,15 +54,12 @@ #:use-module (ravanan work utils) #:use-module (ravanan work vectors) #:export (run-command-line-tool - command-line-tool-scheduler check-requirements inherit-requirements %command-line-tool-supported-requirements - scheduler-proc - scheduler-proc-name - scheduler-proc-cwl - scheduler-proc-scatter - scheduler-proc-scatter-method)) + script->store-stdout-file + script->store-stderr-file + capture-command-line-tool-output)) (define %store-files-directory "files") @@ -85,18 +79,6 @@ (define %worker-node (file-append node "/bin/node")) -(define-immutable-record-type - (scheduler-proc name cwl scatter scatter-method) - scheduler-proc? - (name scheduler-proc-name) - (cwl scheduler-proc-cwl) - (scatter scheduler-proc-scatter) - (scatter-method scheduler-proc-scatter-method)) - -(define-condition-type &job-failure &error - job-failure job-failure? - (script job-failure-script)) - (define-immutable-record-type (formal-output id type binding) formal-output? @@ -1071,97 +1053,3 @@ directory of the workflow." #$scratch))) #$scratch))))) guix-daemon-socket)) - -(define* (command-line-tool-scheduler manifest scratch store batch-system - #:key guix-daemon-socket - slurm-api-endpoint slurm-jwt) - (define (schedule proc inputs) - "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a -job state object." - (let ((name (scheduler-proc-name proc)) - (cwl (scheduler-proc-cwl proc)) - (scatter (from-maybe (scheduler-proc-scatter proc) - #f)) - (scatter-method (from-maybe (scheduler-proc-scatter-method proc) - #f))) - (if scatter - (case scatter-method - ((dot-product) - (apply vector-map - (lambda input-elements - ;; Recurse with scattered inputs spliced in. - (schedule (scheduler-proc (scheduler-proc-name proc) - (scheduler-proc-cwl proc) - %nothing - %nothing) - ;; Replace scattered inputs with single - ;; elements. - (apply assoc-set - inputs - (map cons - (vector->list scatter) - input-elements)))) - ;; Extract values of scattered inputs. - (vector-map->list (cut assoc-ref inputs <>) - scatter))) - ((nested-cross-product flat-cross-product) - (error scatter-method - "Scatter method not implemented yet"))) - (run-command-line-tool name - manifest - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt)))) - - (define (poll state) - "Return current status and updated state of job @var{state} object. The status is -one of the symbols @code{pending} or @code{completed}. Raise an exception and -exit if job has failed." - (guard (c ((job-failure? c) - (let ((script (job-failure-script c))) - (user-error - "~a failed; logs at ~a and ~a~%" - script - (script->store-stdout-file script store) - (script->store-stderr-file script store))))) - (let ((status state (job-state-status state - #:slurm-api-endpoint slurm-api-endpoint - #:slurm-jwt slurm-jwt))) - (values (case status - ((failed) - (raise-exception (job-failure (job-state-script state)))) - (else => identity)) - state)))) - - (define (capture-output state) - "Return output of completed job @var{state}." - (if (vector? state) - ;; Combine outputs from individual state elements. - (match (vector-map capture-output state) - ((and #(head-output _ ...) - outputs) - (map (match-lambda - ((id . value) - (cons id - (vector-map (lambda (output) - ;; FIXME: Is this the correct way to - ;; handle missing outputs? - (or (assoc-ref output id) - 'null)) - outputs)))) - head-output))) - ;; Log progress and return captured output. - (let ((script (job-state-script state))) - (format (current-error-port) - "~a completed; logs at ~a and ~a~%" - script - (script->store-stdout-file script store) - (script->store-stderr-file script store)) - (capture-command-line-tool-output script store)))) - - (scheduler schedule poll capture-output)) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 0249a7e..ff90dca 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -17,12 +17,16 @@ ;;; along with ravanan. If not, see . (define-module (ravanan workflow) + #:use-module ((rnrs conditions) #:select (define-condition-type)) + #:use-module (rnrs exceptions) #:use-module (srfi srfi-1) + #:use-module (srfi srfi-9 gnu) #:use-module (srfi srfi-26) #:use-module (srfi srfi-71) #:use-module (ice-9 filesystem) #:use-module (ice-9 match) #:use-module (ravanan command-line-tool) + #:use-module (ravanan job-state) #:use-module (ravanan propnet) #:use-module (ravanan reader) #:use-module (ravanan work command-line-tool) @@ -42,6 +46,18 @@ (define %job-poll-interval 5) +(define-condition-type &job-failure &error + job-failure job-failure? + (script job-failure-script)) + +(define-immutable-record-type + (scheduler-proc name cwl scatter scatter-method) + scheduler-proc? + (name scheduler-proc-name) + (cwl scheduler-proc-cwl) + (scatter scheduler-proc-scatter) + (scatter-method scheduler-proc-scatter-method)) + (define (value=? maybe-val1 maybe-val2) "Return @code{#t} if maybe-monadic values @var{maybe-val1} and @var{maybe-val2} are the same value. Else, return @code{#f}." @@ -249,6 +265,100 @@ their own namespaces." (error "Invalid workflow class" class))) name cwl))) +(define* (workflow-scheduler manifest scratch store batch-system + #:key guix-daemon-socket + slurm-api-endpoint slurm-jwt) + (define (schedule proc inputs) + "Schedule @var{proc} with inputs from the @var{inputs} association list. Return a +job state object." + (let ((name (scheduler-proc-name proc)) + (cwl (scheduler-proc-cwl proc)) + (scatter (from-maybe (scheduler-proc-scatter proc) + #f)) + (scatter-method (from-maybe (scheduler-proc-scatter-method proc) + #f))) + (if scatter + (case scatter-method + ((dot-product) + (apply vector-map + (lambda input-elements + ;; Recurse with scattered inputs spliced in. + (schedule (scheduler-proc (scheduler-proc-name proc) + (scheduler-proc-cwl proc) + %nothing + %nothing) + ;; Replace scattered inputs with single + ;; elements. + (apply assoc-set + inputs + (map cons + (vector->list scatter) + input-elements)))) + ;; Extract values of scattered inputs. + (vector-map->list (cut assoc-ref inputs <>) + scatter))) + ((nested-cross-product flat-cross-product) + (error scatter-method + "Scatter method not implemented yet"))) + (run-command-line-tool name + manifest + cwl + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt)))) + + (define (poll state) + "Return current status and updated state of job @var{state} object. The status is +one of the symbols @code{pending} or @code{completed}. Raise an exception and +exit if job has failed." + (guard (c ((job-failure? c) + (let ((script (job-failure-script c))) + (user-error + "~a failed; logs at ~a and ~a~%" + script + (script->store-stdout-file script store) + (script->store-stderr-file script store))))) + (let ((status state (job-state-status state + #:slurm-api-endpoint slurm-api-endpoint + #:slurm-jwt slurm-jwt))) + (values (case status + ((failed) + (raise-exception (job-failure (job-state-script state)))) + (else => identity)) + state)))) + + (define (capture-output state) + "Return output of completed job @var{state}." + (if (vector? state) + ;; Combine outputs from individual state elements. + (match (vector-map capture-output state) + ((and #(head-output _ ...) + outputs) + (map (match-lambda + ((id . value) + (cons id + (vector-map (lambda (output) + ;; FIXME: Is this the correct way to + ;; handle missing outputs? + (or (assoc-ref output id) + 'null)) + outputs)))) + head-output))) + ;; Log progress and return captured output. + (let ((script (job-state-script state))) + (format (current-error-port) + "~a completed; logs at ~a and ~a~%" + script + (script->store-stdout-file script store) + (script->store-stderr-file script store)) + (capture-command-line-tool-output script store)))) + + (scheduler schedule poll capture-output)) + (define* (run-workflow name manifest cwl inputs scratch store batch-system #:key guix-daemon-socket @@ -304,7 +414,7 @@ authenticate to the slurm API with. @var{slurm-api-endpoint} and (propnet (workflow->propagators name cwl) value=? merge-values - (command-line-tool-scheduler + (workflow-scheduler manifest scratch store batch-system #:guix-daemon-socket guix-daemon-socket #:slurm-api-endpoint slurm-api-endpoint -- cgit v1.2.3