From afbc3ec99f638a2f8df96a8e952b5b9616dc99a8 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 7 Apr 2020 13:31:49 -0400 Subject: Now moves collections into 'validated sequences' project Improve logging for seq service Fix uploader bug Runs workflow with all validated sequences. --- bh20seqanalyzer/main.py | 83 +++++++++++++++++++++++++++++++++++++++---------- bh20sequploader/main.py | 3 +- setup.py | 2 +- 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 23e58e9..dae8eca 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -1,29 +1,70 @@ import argparse import arvados +import arvados.collection import time import subprocess import tempfile import json +import logging + +logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", + level=logging.INFO) +logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN) + +def validate_upload(api, collection, validated_project): + col = arvados.collection.Collection(collection["uuid"]) + + # validate the collection here. Check metadata, etc. + valid = True + + if "sequence.fasta" not in col: + valid = False + logging.warn("Upload '%s' missing sequence.fasta", collection["name"]) + if "metadata.jsonld" not in col: + logging.warn("Upload '%s' missing metadata.jsonld", collection["name"]) + valid = False + + dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], + ["portable_data_hash", "=", col.portable_data_hash()]]).execute() + if dup["items"]: + # This exact collection has been uploaded before. + valid = False + logging.warn("Upload '%s' is duplicate" % collection["name"]) + + if valid: + logging.info("Added '%s' to validated sequences" % collection["name"]) + # Move it to the "validated" project to be included in the next analysis + api.collections().update(uuid=collection["uuid"], body={"owner_uuid": validated_project}).execute() + else: + # It is invalid, delete it. + logging.warn("Deleting '%s'" % collection["name"]) + api.collections().delete(uuid=collection["uuid"]).execute() + + return valid + +def start_analysis(api, + analysis_project, + workflow_uuid, + validated_project): -def start_analysis(api, collection, analysis_project, workflow_uuid): project = api.groups().create(body={ "group_class": "project", - "name": "Analysis of %s" % collection["name"], + "name": "Pangenome analysis", "owner_uuid": analysis_project, }, ensure_unique_name=True).execute() + validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]]) + with tempfile.NamedTemporaryFile() as tmp: - inputobj = json.dumps({ - "sequence": { + inputobj = { + "inputReads": [] + } + for v in validated: + inputobj["inputReads"].append({ "class": "File", - "location": "keep:%s/sequence.fasta" % collection["portable_data_hash"] - }, - "metadata": { - "class": "File", - "location": "keep:%s/metadata.jsonld" % collection["portable_data_hash"] - } - }, indent=2) - tmp.write(inputobj.encode('utf-8')) + "location": "keep:%s/sequence.fasta" % v["portable_data_hash"] + }) + tmp.write(json.dumps(inputobj, indent=2).encode('utf-8')) tmp.flush() cmd = ["arvados-cwl-runner", "--submit", @@ -32,24 +73,32 @@ def start_analysis(api, collection, analysis_project, workflow_uuid): "--project-uuid=%s" % project["uuid"], "arvwf:%s" % workflow_uuid, tmp.name] - print("Running %s" % ' '.join(cmd)) + logging.info("Running %s" % ' '.join(cmd)) comp = subprocess.run(cmd, capture_output=True) if comp.returncode != 0: - print(comp.stderr.decode('utf-8')) - else: - api.collections().update(uuid=collection["uuid"], body={"owner_uuid": project['uuid']}).execute() + logging.error(comp.stderr.decode('utf-8')) + def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='') parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='') + parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='') parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='') args = parser.parse_args() api = arvados.api() + logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + while True: new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute() + at_least_one_new_valid_seq = False for c in new_collections["items"]: - start_analysis(api, c, args.analysis_project, args.workflow_uuid) + at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project) or at_least_one_new_valid_seq + + if at_least_one_new_valid_seq: + start_analysis(api, args.analysis_project, + args.workflow_uuid, + args.validated_project) time.sleep(10) diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 17ad492..d3ebc0c 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -49,4 +49,5 @@ def main(): (properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) -main() +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 9e73ff0..0685d37 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ import setuptools.command.egg_info as egg_info_cmd from setuptools import setup SETUP_DIR = os.path.dirname(__file__) -README = os.path.join(SETUP_DIR, "README.rst") +README = os.path.join(SETUP_DIR, "README.md") try: import gittaggers -- cgit v1.2.3