aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArun Isaac2025-01-11 00:55:42 +0000
committerArun Isaac2025-01-20 01:47:23 +0000
commit66e77e13bad82e622bbf26c36375d67057cfddc3 (patch)
treeae8262efaad6b90c092981909cd6d67288727b16
parenta0cd2b00e42c8d6c5d414d029c8a2924ab95e5b5 (diff)
downloadravanan-66e77e13bad82e622bbf26c36375d67057cfddc3.tar.gz
ravanan-66e77e13bad82e622bbf26c36375d67057cfddc3.tar.lz
ravanan-66e77e13bad82e622bbf26c36375d67057cfddc3.zip
slurm: Make interface state-monadic.
* ravanan/slurm-api.scm: Import (ravanan work monads). (slurm-http-request, slurm-http-get, slurm-http-post, submit-job, job-state): Convert to state-monadic functions. * ravanan/command-line-tool.scm (run-command-line-tool): Update use of submit-job. * ravanan/job-state.scm: Import (ravanan work monads). (job-state-status): Update use of job-state.
-rw-r--r--ravanan/command-line-tool.scm27
-rw-r--r--ravanan/job-state.scm10
-rw-r--r--ravanan/slurm-api.scm145
3 files changed, 96 insertions, 86 deletions
diff --git a/ravanan/command-line-tool.scm b/ravanan/command-line-tool.scm
index f01c774..acab4af 100644
--- a/ravanan/command-line-tool.scm
+++ b/ravanan/command-line-tool.scm
@@ -453,19 +453,20 @@ path."
(format (current-error-port)
"Submitting job ~a~%"
script)
- (let ((job-id (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
- ,store-files-directory)
- ("WORKFLOW_OUTPUT_DATA_FILE" .
- ,store-data-file))
- stdout-file
- stderr-file
- cpus
- name
- script
- #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
- #:jwt (slurm-api-batch-system-jwt batch-system)
- #:partition (slurm-api-batch-system-partition batch-system)
- #:nice (slurm-api-batch-system-nice batch-system))))
+ (let ((job-id (run-with-state
+ (slurm:submit-job `(("WORKFLOW_OUTPUT_DIRECTORY" .
+ ,store-files-directory)
+ ("WORKFLOW_OUTPUT_DATA_FILE" .
+ ,store-data-file))
+ stdout-file
+ stderr-file
+ cpus
+ name
+ script
+ #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+ #:jwt (slurm-api-batch-system-jwt batch-system)
+ #:partition (slurm-api-batch-system-partition batch-system)
+ #:nice (slurm-api-batch-system-nice batch-system)))))
(format (current-error-port)
"~a submitted as job ID ~a~%"
script
diff --git a/ravanan/job-state.scm b/ravanan/job-state.scm
index 834a5bd..3d620f2 100644
--- a/ravanan/job-state.scm
+++ b/ravanan/job-state.scm
@@ -1,5 +1,5 @@
;;; ravanan --- High-reproducibility CWL runner powered by Guix
-;;; Copyright © 2024 Arun Isaac <arunisaac@systemreboot.net>
+;;; Copyright © 2024, 2025 Arun Isaac <arunisaac@systemreboot.net>
;;;
;;; This file is part of ravanan.
;;;
@@ -28,6 +28,7 @@
#:use-module (srfi srfi-9 gnu)
#:use-module (ravanan batch-system)
#:use-module (ravanan slurm-api)
+ #:use-module (ravanan work monads)
#:use-module (ravanan work vectors)
#:export (single-machine-job-state
slurm-job-state
@@ -68,9 +69,10 @@
'failed))
;; Poll slurm for job state.
((slurm-job-state? state)
- (job-state (slurm-job-state-job-id state)
- #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
- #:jwt (slurm-api-batch-system-jwt batch-system)))
+ (run-with-state
+ (job-state (slurm-job-state-job-id state)
+ #:api-endpoint (slurm-api-batch-system-endpoint batch-system)
+ #:jwt (slurm-api-batch-system-jwt batch-system))))
;; For vector states, poll each state element and return 'completed
;; only if all state elements have completed.
((vector? state)
diff --git a/ravanan/slurm-api.scm b/ravanan/slurm-api.scm
index ea56775..7c479ee 100644
--- a/ravanan/slurm-api.scm
+++ b/ravanan/slurm-api.scm
@@ -1,5 +1,5 @@
;;; ravanan --- High-reproducibility CWL runner powered by Guix
-;;; Copyright © 2024 Arun Isaac <arunisaac@systemreboot.net>
+;;; Copyright © 2024, 2025 Arun Isaac <arunisaac@systemreboot.net>
;;;
;;; This file is part of ravanan.
;;;
@@ -25,6 +25,7 @@
#:use-module (web client)
#:use-module (web uri)
#:use-module (json)
+ #:use-module (ravanan work monads)
#:use-module (ravanan work utils)
#:export (submit-job
job-state))
@@ -32,18 +33,19 @@
(define* (slurm-http-request api-endpoint jwt method path
#:key (headers '()) body)
"Make a HTTP request to @var{path} using @var{method} on a slurm
-@var{api-endpoint} authenticating using @var{jwt}. Pass body and
-additional @var{headers}."
- (let ((response body (http-request (build-uri (uri-scheme api-endpoint)
- #:host (uri-host api-endpoint)
- #:port (uri-port api-endpoint)
- #:path path)
- #:method method
- #:headers `((X-SLURM-USER-TOKEN . ,jwt)
- ,@headers)
- #:body body
- #:streaming? #t)))
- (json->scm body)))
+@var{api-endpoint} authenticating using @var{jwt}. Pass body and additional
+@var{headers}. Return the JSON response tree as a state-monadic value."
+ (state-return
+ (let ((response body (http-request (build-uri (uri-scheme api-endpoint)
+ #:host (uri-host api-endpoint)
+ #:port (uri-port api-endpoint)
+ #:path path)
+ #:method method
+ #:headers `((X-SLURM-USER-TOKEN . ,jwt)
+ ,@headers)
+ #:body body
+ #:streaming? #t)))
+ (json->scm body))))
(define (check-api-error json)
"Check @var{json} API response for errors, and raise an exception if any."
@@ -53,13 +55,15 @@ additional @var{headers}."
(define (slurm-http-get api-endpoint jwt path)
"Make a HTTP GET request to @var{path} on a slurm @var{api-endpoint}
-authenticating using @var{jwt}."
+authenticating using @var{jwt}. Return the JSON response tree as a state-monadic
+value."
(slurm-http-request api-endpoint jwt 'GET path))
(define (slurm-http-post api-endpoint jwt path body-scm)
"Make a HTTP POST request to @var{path} on a slurm @var{api-endpoint}
-authenticating using @var{jwt}. Convert @var{body-scm} to a JSON
-document and pass in as the body of the HTTP request."
+authenticating using @var{jwt}. Convert @var{body-scm} to a JSON document and
+pass in as the body of the HTTP request. Return the JSON response tree as a
+state-monadic value."
(slurm-http-request api-endpoint
jwt
'POST
@@ -77,7 +81,8 @@ of environment variables to set in the job. @var{stdout-file} and
@var{stderr-file} are files in which to write the stdout and stderr of the job
respectively. @var{cpus} is the number of CPUs (in slurm terminology, a CPU is a
hyperthread; see @url{https://slurm.schedmd.com/faq.html#cpu_count, the Slurm
-FAQ}) to request for the job."
+FAQ}) to request for the job. Return the slurm job ID of the submitted job as a
+state-monadic value."
(define job-spec
(append `(("name" . ,name)
("script" . ,(string-append "#!/bin/bash\n" script))
@@ -97,59 +102,61 @@ FAQ}) to request for the job."
`(("nice" . ,nice))
'())))
- (json-ref (check-api-error
- (slurm-http-post api-endpoint
- jwt
- "/slurm/v0.0.41/job/submit"
- `(("jobs" . #(,job-spec)))))
- "job_id"))
+ (state-let* ((json (slurm-http-post api-endpoint
+ jwt
+ "/slurm/v0.0.41/job/submit"
+ `(("jobs" . #(,job-spec))))))
+ (check-api-error json)
+ (state-return (json-ref json "job_id"))))
(define* (job-state job-id #:key api-endpoint jwt)
"Query the state of slurm @var{job-id} via @var{api-endpoint}
authenticating using @var{jwt}. Return value is one of the symbols
-@code{pending}, @code{failed} and @code{completed}."
- (let ((response (slurm-http-get api-endpoint
- jwt
- (string-append "/slurm/v0.0.41/job/"
- (number->string job-id)))))
- (match (json-ref response "errors")
- (#()
- (match (json-ref (find (lambda (job)
- (= (json-ref job "job_id")
- job-id))
- (vector->list (json-ref response "jobs")))
- "job_state")
- (#(job-state)
- (string->symbol (string-downcase job-state)))))
- (#(errors ...)
- ;; Check in slurmdbd if job has been completed and purged from
- ;; slurmctld's active memory.
- (match (find (lambda (error)
- (= (json-ref error "error_number")
- ;; Error number 2017 (Invalid job id specified) may
- ;; have occurred because the job has completed, has
- ;; exceeded MinJobAge (as set in slurm.conf) and has
- ;; therefore been purged from slurmctld's active
- ;; memory.
- 2017))
- errors)
- (error-2017
- (let ((response
- (check-api-error
- (slurm-http-get api-endpoint
- jwt
- (string-append "/slurmdb/v0.0.41/job/"
- (number->string job-id))))))
- (match (json-ref (find (lambda (job)
- (= (json-ref job "job_id")
- job-id))
- (vector->list (json-ref response "jobs")))
- "exit_code" "status")
- (#(job-state)
- ;; job-state is either "SUCCESS" or "ERROR".
- (if (eq? (string->symbol (string-downcase job-state))
- 'success)
- 'success
- 'failed)))))
- (#f
- (check-api-error response)))))))
+@code{pending}, @code{failed} and @code{completed} encapsulated in the state
+monad."
+ (state-let* ((response (slurm-http-get api-endpoint
+ jwt
+ (string-append "/slurm/v0.0.41/job/"
+ (number->string job-id)))))
+ (state-return
+ (match (json-ref response "errors")
+ (#()
+ (match (json-ref (find (lambda (job)
+ (= (json-ref job "job_id")
+ job-id))
+ (vector->list (json-ref response "jobs")))
+ "job_state")
+ (#(job-state)
+ (string->symbol (string-downcase job-state)))))
+ (#(errors ...)
+ ;; Check in slurmdbd if job has been completed and purged from
+ ;; slurmctld's active memory.
+ (match (find (lambda (error)
+ (= (json-ref error "error_number")
+ ;; Error number 2017 (Invalid job id specified) may
+ ;; have occurred because the job has completed, has
+ ;; exceeded MinJobAge (as set in slurm.conf) and has
+ ;; therefore been purged from slurmctld's active
+ ;; memory.
+ 2017))
+ errors)
+ (error-2017
+ (let ((response
+ (check-api-error
+ (slurm-http-get api-endpoint
+ jwt
+ (string-append "/slurmdb/v0.0.41/job/"
+ (number->string job-id))))))
+ (match (json-ref (find (lambda (job)
+ (= (json-ref job "job_id")
+ job-id))
+ (vector->list (json-ref response "jobs")))
+ "exit_code" "status")
+ (#(job-state)
+ ;; job-state is either "SUCCESS" or "ERROR".
+ (if (eq? (string->symbol (string-downcase job-state))
+ 'success)
+ 'success
+ 'failed)))))
+ (#f
+ (check-api-error response))))))))