diff options
-rw-r--r-- | ravanan/command-line-tool.scm | 66 | ||||
-rw-r--r-- | ravanan/propnet.scm | 24 | ||||
-rw-r--r-- | ravanan/workflow.scm | 47 |
3 files changed, 73 insertions, 64 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm index 7da91e3..a5b800f 100644 --- a/ravanan/command-line-tool.scm +++ b/ravanan/command-line-tool.scm @@ -389,7 +389,8 @@ path." scratch store batch-system #:key guix-daemon-socket) "Run @code{CommandLineTool} class workflow @var{cwl} named @var{name} with -@var{inputs} using tools from Guix manifest in @var{manifest-file}. +@var{inputs} using tools from Guix manifest in @var{manifest-file}. Return a +state-monadic job state object. @var{channels}, @var{scratch}, @var{store}, @var{batch-system} and @var{guix-daemon-socket} are the same as in @code{run-workflow} from @@ -422,11 +423,12 @@ path." (if (file-exists? store-data-file) ;; Return a dummy success state object if script has already ;; been run successfully. - (begin - (format (current-error-port) - "~a previously run; retrieving result from store~%" - script) - (single-machine-job-state script #t)) + (state-return + (begin + (format (current-error-port) + "~a previously run; retrieving result from store~%" + script) + (single-machine-job-state script #t))) ;; Run script if it has not already been run. (begin ;; Delete output files directory if an incomplete one exists @@ -440,39 +442,35 @@ path." (mkdir store-files-directory) (cond ((eq? batch-system 'single-machine) - (single-machine-job-state script - (run-with-state - (single-machine:submit-job - `(("WORKFLOW_OUTPUT_DIRECTORY" . - ,store-files-directory) - ("WORKFLOW_OUTPUT_DATA_FILE" . - ,store-data-file)) - stdout-file - stderr-file - script)))) + (state-let* ((success? (single-machine:submit-job + `(("WORKFLOW_OUTPUT_DIRECTORY" . + ,store-files-directory) + ("WORKFLOW_OUTPUT_DATA_FILE" . + ,store-data-file)) + stdout-file + stderr-file + script))) + (state-return (single-machine-job-state script success?)))) ((slurm-api-batch-system? batch-system) - (format (current-error-port) - "Submitting job ~a~%" - script) - (let ((job-id (run-with-state - (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" . - ,store-files-directory) - ("WORKFLOW_OUTPUT_DATA_FILE" . - ,store-data-file)) - stdout-file - stderr-file - cpus - name - script - #:api-endpoint (slurm-api-batch-system-endpoint batch-system) - #:jwt (slurm-api-batch-system-jwt batch-system) - #:partition (slurm-api-batch-system-partition batch-system) - #:nice (slurm-api-batch-system-nice batch-system))))) + (state-let* ((job-id + (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" . + ,store-files-directory) + ("WORKFLOW_OUTPUT_DATA_FILE" . + ,store-data-file)) + stdout-file + stderr-file + cpus + name + script + #:api-endpoint (slurm-api-batch-system-endpoint batch-system) + #:jwt (slurm-api-batch-system-jwt batch-system) + #:partition (slurm-api-batch-system-partition batch-system) + #:nice (slurm-api-batch-system-nice batch-system)))) (format (current-error-port) "~a submitted as job ID ~a~%" script job-id) - (slurm-job-state script job-id))) + (state-return (slurm-job-state script job-id)))) (else (assertion-violation batch-system "Invalid batch system"))))))) diff --git a/ravanan/propnet.scm b/ravanan/propnet.scm index dfe61ac..da74dec 100644 --- a/ravanan/propnet.scm +++ b/ravanan/propnet.scm @@ -161,6 +161,21 @@ add to the inbox." (maybe-assoc-ref (just cells) cell-name)))) (propagator-inputs propagator)))) + (define (schedule-propagators propagators cells) + "Schedule all propagators among @var{propagators} whose inputs are present in +@var{cells}. Return an association list mapping scheduled propagator names to +their states." + (append-map (lambda (propagator) + (maybe-alist + (cons (propagator-name propagator) + (maybe-let* ((propagator-state + (activate-propagator + scheduler + propagator + (propagator-input-values cells propagator)))) + (just (run-with-state propagator-state)))))) + propagators)) + ;; We implement propagator networks as a state machine. The state consists of ;; the current values of all the cells and the list of all propagators ;; currently in flight. Each iteration of loop represents one state @@ -269,14 +284,7 @@ add to the inbox." ;; application of propnets, this will never result in the same ;; step being recomputed; so this approach does not come at a ;; higher computational cost. - (append (append-map (lambda (propagator) - (maybe-alist - (cons (propagator-name propagator) - (activate-propagator - scheduler - propagator - (propagator-input-values cells propagator))))) - propagators-inbox) + (append (schedule-propagators propagators-inbox cells) propagators-in-flight)))))))) (define (capture-propnet-output state) diff --git a/ravanan/workflow.scm b/ravanan/workflow.scm index 09d733c..1ef9284 100644 --- a/ravanan/workflow.scm +++ b/ravanan/workflow.scm @@ -1,5 +1,5 @@ ;;; ravanan --- High-reproducibility CWL runner powered by Guix -;;; Copyright © 2024 Arun Isaac <arunisaac@systemreboot.net> +;;; Copyright © 2024, 2025 Arun Isaac <arunisaac@systemreboot.net> ;;; ;;; This file is part of ravanan. ;;; @@ -288,26 +288,28 @@ job state object. @var{proc} may either be a @code{<propnet>} object or a store))) (cond ((string=? class "CommandLineTool") - (command-line-tool-state - (run-command-line-tool name - manifest-file - channels - cwl - inputs - scratch - store - batch-system - #:guix-daemon-socket guix-daemon-socket) - (assoc-ref* cwl "outputs"))) + (state-let* ((job-state + (run-command-line-tool name + manifest-file + channels + cwl + inputs + scratch + store + batch-system + #:guix-daemon-socket guix-daemon-socket))) + (state-return (command-line-tool-state job-state + (assoc-ref* cwl "outputs"))))) ((string=? class "ExpressionTool") (error "Workflow class not implemented yet" class)) ((string=? class "Workflow") - (workflow-state (schedule-propnet (workflow-class->propnet name - cwl - scheduler - batch-system) - inputs) - (assoc-ref* cwl "outputs")))))))) + (state-return + (workflow-state (schedule-propnet (workflow-class->propnet name + cwl + scheduler + batch-system) + inputs) + (assoc-ref* cwl "outputs"))))))))) (define (poll state) "Return current status and updated state of job @var{state} object. The status is @@ -610,10 +612,11 @@ area need not be shared. @var{store} is the path to the shared ravanan store. (let ((scheduler (workflow-scheduler manifest-file channels scratch store batch-system #:guix-daemon-socket guix-daemon-socket))) - (let loop ((state ((scheduler-schedule scheduler) - (scheduler-proc name cwl %nothing %nothing) - inputs - scheduler))) + (let loop ((state (run-with-state + ((scheduler-schedule scheduler) + (scheduler-proc name cwl %nothing %nothing) + inputs + scheduler)))) ;; Poll. (let ((status state ((scheduler-poll scheduler) state))) (if (eq? status 'pending) |