aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bh20seqanalyzer/main.py83
-rw-r--r--bh20sequploader/main.py3
-rw-r--r--setup.py2
3 files changed, 69 insertions, 19 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 23e58e9..dae8eca 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -1,29 +1,70 @@
import argparse
import arvados
+import arvados.collection
import time
import subprocess
import tempfile
import json
+import logging
+
+logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
+ level=logging.INFO)
+logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN)
+
+def validate_upload(api, collection, validated_project):
+ col = arvados.collection.Collection(collection["uuid"])
+
+ # validate the collection here. Check metadata, etc.
+ valid = True
+
+ if "sequence.fasta" not in col:
+ valid = False
+ logging.warn("Upload '%s' missing sequence.fasta", collection["name"])
+ if "metadata.jsonld" not in col:
+ logging.warn("Upload '%s' missing metadata.jsonld", collection["name"])
+ valid = False
+
+ dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
+ ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+ if dup["items"]:
+ # This exact collection has been uploaded before.
+ valid = False
+ logging.warn("Upload '%s' is duplicate" % collection["name"])
+
+ if valid:
+ logging.info("Added '%s' to validated sequences" % collection["name"])
+ # Move it to the "validated" project to be included in the next analysis
+ api.collections().update(uuid=collection["uuid"], body={"owner_uuid": validated_project}).execute()
+ else:
+ # It is invalid, delete it.
+ logging.warn("Deleting '%s'" % collection["name"])
+ api.collections().delete(uuid=collection["uuid"]).execute()
+
+ return valid
+
+def start_analysis(api,
+ analysis_project,
+ workflow_uuid,
+ validated_project):
-def start_analysis(api, collection, analysis_project, workflow_uuid):
project = api.groups().create(body={
"group_class": "project",
- "name": "Analysis of %s" % collection["name"],
+ "name": "Pangenome analysis",
"owner_uuid": analysis_project,
}, ensure_unique_name=True).execute()
+ validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]])
+
with tempfile.NamedTemporaryFile() as tmp:
- inputobj = json.dumps({
- "sequence": {
+ inputobj = {
+ "inputReads": []
+ }
+ for v in validated:
+ inputobj["inputReads"].append({
"class": "File",
- "location": "keep:%s/sequence.fasta" % collection["portable_data_hash"]
- },
- "metadata": {
- "class": "File",
- "location": "keep:%s/metadata.jsonld" % collection["portable_data_hash"]
- }
- }, indent=2)
- tmp.write(inputobj.encode('utf-8'))
+ "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
+ })
+ tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
tmp.flush()
cmd = ["arvados-cwl-runner",
"--submit",
@@ -32,24 +73,32 @@ def start_analysis(api, collection, analysis_project, workflow_uuid):
"--project-uuid=%s" % project["uuid"],
"arvwf:%s" % workflow_uuid,
tmp.name]
- print("Running %s" % ' '.join(cmd))
+ logging.info("Running %s" % ' '.join(cmd))
comp = subprocess.run(cmd, capture_output=True)
if comp.returncode != 0:
- print(comp.stderr.decode('utf-8'))
- else:
- api.collections().update(uuid=collection["uuid"], body={"owner_uuid": project['uuid']}).execute()
+ logging.error(comp.stderr.decode('utf-8'))
+
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='')
+ parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='')
parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='')
args = parser.parse_args()
api = arvados.api()
+ logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
+
while True:
new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute()
+ at_least_one_new_valid_seq = False
for c in new_collections["items"]:
- start_analysis(api, c, args.analysis_project, args.workflow_uuid)
+ at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project) or at_least_one_new_valid_seq
+
+ if at_least_one_new_valid_seq:
+ start_analysis(api, args.analysis_project,
+ args.workflow_uuid,
+ args.validated_project)
time.sleep(10)
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index 17ad492..d3ebc0c 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -49,4 +49,5 @@ def main():
(properties['upload_user'], properties['upload_ip']),
properties=properties, ensure_unique_name=True)
-main()
+if __name__ == "__main__":
+ main()
diff --git a/setup.py b/setup.py
index 9e73ff0..0685d37 100644
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,7 @@ import setuptools.command.egg_info as egg_info_cmd
from setuptools import setup
SETUP_DIR = os.path.dirname(__file__)
-README = os.path.join(SETUP_DIR, "README.rst")
+README = os.path.join(SETUP_DIR, "README.md")
try:
import gittaggers