From afbc3ec99f638a2f8df96a8e952b5b9616dc99a8 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 7 Apr 2020 13:31:49 -0400 Subject: Now moves collections into 'validated sequences' project Improve logging for seq service Fix uploader bug Runs workflow with all validated sequences. --- bh20sequploader/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'bh20sequploader/main.py') diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 17ad492..d3ebc0c 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -49,4 +49,5 @@ def main(): (properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) -main() +if __name__ == "__main__": + main() -- cgit v1.2.3 From 07bc4c65535437b8e9e0744f08da8cea541d0116 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 7 Apr 2020 15:28:42 -0400 Subject: Add metadata validation with schema-salad --- bh20seqanalyzer/main.py | 11 ++++++++--- bh20sequploader/bh20seq-schema.yml | 36 ++++++++++++++++++++++++++++++++++++ bh20sequploader/main.py | 7 +++++-- bh20sequploader/qc_metadata.py | 26 +++++++++++++++++--------- example/dummyschema.yaml | 16 ---------------- setup.py | 3 ++- 6 files changed, 68 insertions(+), 31 deletions(-) create mode 100644 bh20sequploader/bh20seq-schema.yml delete mode 100644 example/dummyschema.yaml (limited to 'bh20sequploader/main.py') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 2513ea3..78e32c9 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -6,12 +6,14 @@ import subprocess import tempfile import json import logging +import ruamel.yaml +from bh20sequploader.qc_metadata import qc_metadata logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN) -def validate_upload(api, collection, validated_project, latest_result_uuid): +def validate_upload(api, collection, validated_project): col = arvados.collection.Collection(collection["uuid"]) # validate the collection here. Check metadata, etc. @@ -20,9 +22,12 @@ def validate_upload(api, collection, validated_project, latest_result_uuid): if "sequence.fasta" not in col: valid = False logging.warn("Upload '%s' missing sequence.fasta", collection["name"]) - if "metadata.jsonld" not in col: - logging.warn("Upload '%s' missing metadata.jsonld", collection["name"]) + if "metadata.yaml" not in col: + logging.warn("Upload '%s' missing metadata.yaml", collection["name"]) valid = False + else: + metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) + valid = qc_metadata(metadata_content) and valid dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], ["portable_data_hash", "=", col.portable_data_hash()]]).execute() diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml new file mode 100644 index 0000000..6e0973a --- /dev/null +++ b/bh20sequploader/bh20seq-schema.yml @@ -0,0 +1,36 @@ +$graph: + +- name: sampleInformationSchema + type: record + fields: + location: string + host: string + sequenceTechnology: string + assemblyMethod: string + +- name: InstituteInformationSchema + type: record + fields: + OriginatingLab: string + SubmittingLab: string + +- name: SubmitterInformationSchema + type: record + fields: + Submitter: string + submissionDate: string + +- name: VirusDetailSchema + type: record + fields: + VirusName: string + AccessionId: string + +- name: MainSchema + type: record + documentRoot: true + fields: + sampleInformation: sampleInformationSchema + InstituteInformation: InstituteInformationSchema + SubmitterInformation: SubmitterInformationSchema + VirusDetail: VirusDetailSchema diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index d3ebc0c..8b8fefe 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -6,6 +6,7 @@ import json import urllib.request import socket import getpass +from .qc_metadata import qc_metadata ARVADOS_API_HOST='lugli.arvadosapi.com' ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' @@ -19,6 +20,8 @@ def main(): api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) + qc_metadata(args.metadata.name) + col = arvados.collection.Collection(api_client=api) print("Reading FASTA") @@ -29,8 +32,8 @@ def main(): f.write(r) r = args.sequence.read(65536) - print("Reading JSONLD") - with col.open("metadata.jsonld", "w") as f: + print("Reading metadata") + with col.open("metadata.yaml", "w") as f: r = args.metadata.read(65536) print(r[0:20]) while r: diff --git a/bh20sequploader/qc_metadata.py b/bh20sequploader/qc_metadata.py index 0632777..78b31b2 100644 --- a/bh20sequploader/qc_metadata.py +++ b/bh20sequploader/qc_metadata.py @@ -1,13 +1,21 @@ -import yamale +import schema_salad.schema +import logging +import pkg_resources -## NOTE: this is just a DUMMY. Everything about this can and will change def qc_metadata(metadatafile): - print("Start metadata validation...") - schema = yamale.make_schema('../example/dummyschema.yaml') - data = yamale.make_data(metadatafile) - # Validate data against the schema. Throws a ValueError if data is invalid. - yamale.validate(schema, data) - print("...complete!") + schema_resource = pkg_resources.resource_stream(__name__, "bh20seq-schema.yml") + cache = {"https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml": schema_resource.read().decode("utf-8")} + (document_loader, + avsc_names, + schema_metadata, + metaschema_loader) = schema_salad.schema.load_schema("https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml", cache=cache) -#qc_metadata("../example/metadata.yaml") + if not isinstance(avsc_names, schema_salad.avro.schema.Names): + print(avsc_names) + return False + try: + doc, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, metadatafile, True) + return True + except: + return False diff --git a/example/dummyschema.yaml b/example/dummyschema.yaml deleted file mode 100644 index e428324..0000000 --- a/example/dummyschema.yaml +++ /dev/null @@ -1,16 +0,0 @@ -#sampleInformation: include('sampleInformation') -#InstituteInformation: include('InstituteInformation') ---- -sampleInformation: - location : str() - host : str() - sequenceTechnology: str() - assemblyMethod: str() - -InstituteInformation: - OriginatingLab: str() - SubmittingLab: str() - -VirusDetail: - VirusName: str() - AccessionId: str() diff --git a/setup.py b/setup.py index 0685d37..48c25aa 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ try: except ImportError: tagger = egg_info_cmd.egg_info -install_requires = ["arvados-python-client"] +install_requires = ["arvados-python-client", "schema-salad"] needs_pytest = {"pytest", "test", "ptr"}.intersection(sys.argv) pytest_runner = ["pytest < 6", "pytest-runner < 5"] if needs_pytest else [] @@ -30,6 +30,7 @@ setup( author_email="peter.amstutz@curii.com", license="Apache 2.0", packages=["bh20sequploader", "bh20seqanalyzer"], + package_data={"bh20sequploader": ["bh20seq-schema.yml"]}, install_requires=install_requires, setup_requires=[] + pytest_runner, tests_require=["pytest<5"], -- cgit v1.2.3 From 9458ed33da08c787c4bb20af7b4108c93334b351 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 8 Apr 2020 17:41:19 -0400 Subject: Fastq now runs through fastq2fasta pipeline then gets added to pangenome analysis. --- bh20seqanalyzer/main.py | 141 ++++++++++++++++++++++++++++++----------- bh20sequploader/main.py | 14 +++- bh20sequploader/qc_metadata.py | 6 +- 3 files changed, 120 insertions(+), 41 deletions(-) (limited to 'bh20sequploader/main.py') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 78e32c9..1a8965b 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -13,21 +13,30 @@ logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="% level=logging.INFO) logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN) -def validate_upload(api, collection, validated_project): +def validate_upload(api, collection, validated_project, + fastq_project, fastq_workflow_uuid): col = arvados.collection.Collection(collection["uuid"]) # validate the collection here. Check metadata, etc. valid = True - if "sequence.fasta" not in col: - valid = False - logging.warn("Upload '%s' missing sequence.fasta", collection["name"]) if "metadata.yaml" not in col: logging.warn("Upload '%s' missing metadata.yaml", collection["name"]) valid = False else: metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) - valid = qc_metadata(metadata_content) and valid + #valid = qc_metadata(metadata_content) and valid + if not valid: + logging.warn("Failed metadata qc") + + if valid: + if "sequence.fasta" not in col: + if "reads.fastq" in col: + start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid) + return False + else: + valid = False + logging.warn("Upload '%s' missing sequence.fasta", collection["name"]) dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], ["portable_data_hash", "=", col.portable_data_hash()]]).execute() @@ -39,7 +48,9 @@ def validate_upload(api, collection, validated_project): if valid: logging.info("Added '%s' to validated sequences" % collection["name"]) # Move it to the "validated" project to be included in the next analysis - api.collections().update(uuid=collection["uuid"], body={"owner_uuid": validated_project}).execute() + api.collections().update(uuid=collection["uuid"], body={ + "owner_uuid": validated_project, + "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute() else: # It is invalid, delete it. logging.warn("Deleting '%s'" % collection["name"]) @@ -47,28 +58,15 @@ def validate_upload(api, collection, validated_project): return valid -def start_analysis(api, - analysis_project, - workflow_uuid, - validated_project): +def run_workflow(api, parent_project, workflow_uuid, name, inputobj): project = api.groups().create(body={ "group_class": "project", - "name": "Pangenome analysis", - "owner_uuid": analysis_project, + "name": name, + "owner_uuid": parent_project, }, ensure_unique_name=True).execute() - validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]]) - with tempfile.NamedTemporaryFile() as tmp: - inputobj = { - "inputReads": [] - } - for v in validated: - inputobj["inputReads"].append({ - "class": "File", - "location": "keep:%s/sequence.fasta" % v["portable_data_hash"] - }) tmp.write(json.dumps(inputobj, indent=2).encode('utf-8')) tmp.flush() cmd = ["arvados-cwl-runner", @@ -83,32 +81,95 @@ def start_analysis(api, if comp.returncode != 0: logging.error(comp.stderr.decode('utf-8')) + return project + + +def start_fastq_to_fasta(api, collection, + analysis_project, + fastq_workflow_uuid): + newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", { + "fastq_forward": { + "class": "File", + "location": "keep:%s/reads.fastq" % collection["portable_data_hash"] + }, + "metadata": { + "class": "File", + "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"] + }, + "ref_fasta": { + "class": "File", + "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta" + } + }) + api.collections().update(uuid=collection["uuid"], + body={"owner_uuid": newproject["uuid"]}).execute() + +def start_pangenome_analysis(api, + analysis_project, + pangenome_workflow_uuid, + validated_project): + validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]]) + inputobj = { + "inputReads": [] + } + for v in validated: + inputobj["inputReads"].append({ + "class": "File", + "location": "keep:%s/sequence.fasta" % v["portable_data_hash"] + }) + run_workflow(api, analysis_project, pangenome_workflow_uuid, "Pangenome analysis", inputobj) + + +def get_workflow_output_from_project(api, uuid): + cr = api.container_requests().list(filters=[['owner_uuid', '=', uuid], + ["requesting_container_uuid", "=", None]]).execute() + if cr["items"] and cr["items"][0]["output_uuid"]: + return cr["items"][0] + else: + return None + def copy_most_recent_result(api, analysis_project, latest_result_uuid): most_recent_analysis = api.groups().list(filters=[['owner_uuid', '=', analysis_project]], order="created_at desc", limit=1).execute() for m in most_recent_analysis["items"]: - cr = api.container_requests().list(filters=[['owner_uuid', '=', m["uuid"]], - ["requesting_container_uuid", "=", None]]).execute() - if cr["items"] and cr["items"][0]["output_uuid"]: - wf = cr["items"][0] + wf = get_workflow_output_from_project(api, m["uuid"]) + if wf: src = api.collections().get(uuid=wf["output_uuid"]).execute() dst = api.collections().get(uuid=latest_result_uuid).execute() if src["portable_data_hash"] != dst["portable_data_hash"]: logging.info("Copying latest result from '%s' to %s", m["name"], latest_result_uuid) api.collections().update(uuid=latest_result_uuid, body={"manifest_text": src["manifest_text"], - "description": "latest result from %s %s" % (m["name"], wf["uuid"])}).execute() + "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute() break +def move_fastq_to_fasta_results(api, analysis_project, uploader_project): + projects = api.groups().list(filters=[['owner_uuid', '=', analysis_project], + ["properties.moved_output", "!=", True]], + order="created_at desc",).execute() + for p in projects["items"]: + wf = get_workflow_output_from_project(api, p["uuid"]) + if wf: + logging.info("Moving completed fastq2fasta result %s back to uploader project", wf["output_uuid"]) + api.collections().update(uuid=wf["output_uuid"], + body={"owner_uuid": uploader_project}).execute() + p["properties"]["moved_output"] = True + api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute() + + def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='') - parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='') + parser.add_argument('--pangenome-analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='') + parser.add_argument('--fastq-project', type=str, default='lugli-j7d0g-xcjxp4oox2u1w8u', help='') parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='') - parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='') - parser.add_argument('--latest-result-uuid', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') + + parser.add_argument('--pangenome-workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='') + parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='') + + parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') args = parser.parse_args() api = arvados.api() @@ -116,16 +177,24 @@ def main(): logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) while True: + move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) + new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute() at_least_one_new_valid_seq = False for c in new_collections["items"]: - at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project) or at_least_one_new_valid_seq + at_least_one_new_valid_seq = validate_upload(api, c, + args.validated_project, + args.fastq_project, + args.fastq_workflow_uuid) or at_least_one_new_valid_seq if at_least_one_new_valid_seq: - start_analysis(api, args.analysis_project, - args.workflow_uuid, - args.validated_project) + start_pangenome_analysis(api, + args.pangenome_analysis_project, + args.pangenome_workflow_uuid, + args.validated_project) - copy_most_recent_result(api, args.analysis_project, args.latest_result_uuid) + copy_most_recent_result(api, + args.pangenome_analysis_project, + args.latest_result_collection) - time.sleep(10) + time.sleep(15) diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 8b8fefe..56cbe22 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -20,12 +20,18 @@ def main(): api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) - qc_metadata(args.metadata.name) + if not qc_metadata(args.metadata.name): + print("Failed metadata qc") + exit(1) col = arvados.collection.Collection(api_client=api) - print("Reading FASTA") - with col.open("sequence.fasta", "w") as f: + if args.sequence.name.endswith("fasta") or args.sequence.name.endswith("fa"): + target = "sequence.fasta" + elif args.sequence.name.endswith("fastq") or args.sequence.name.endswith("fq"): + target = "reads.fastq" + + with col.open(target, "w") as f: r = args.sequence.read(65536) print(r[0:20]) while r: @@ -52,5 +58,7 @@ def main(): (properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) + print("Done") + if __name__ == "__main__": main() diff --git a/bh20sequploader/qc_metadata.py b/bh20sequploader/qc_metadata.py index 78b31b2..ebe4dfc 100644 --- a/bh20sequploader/qc_metadata.py +++ b/bh20sequploader/qc_metadata.py @@ -1,6 +1,7 @@ import schema_salad.schema import logging import pkg_resources +import logging def qc_metadata(metadatafile): schema_resource = pkg_resources.resource_stream(__name__, "bh20seq-schema.yml") @@ -17,5 +18,6 @@ def qc_metadata(metadatafile): try: doc, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, metadatafile, True) return True - except: - return False + except Exception as e: + logging.warn(e) + return False -- cgit v1.2.3 From bf93a6a2fec690eee4bff4891469cd5947102b3a Mon Sep 17 00:00:00 2001 From: Pjotr Prins Date: Thu, 9 Apr 2020 17:02:38 -0500 Subject: Moved Guix documentation into separate file (as it confused people ;) --- README.md | 21 +++++---------------- bh20sequploader/main.py | 2 +- doc/INSTALL.md | 31 +++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 17 deletions(-) create mode 100644 doc/INSTALL.md (limited to 'bh20sequploader/main.py') diff --git a/README.md b/README.md index a6fe052..3a8e5f0 100644 --- a/README.md +++ b/README.md @@ -122,19 +122,7 @@ It should print some instructions about how to use the uploader. ## Installation with GNU Guix -Another way to install this tool is inside a [GNU Guix Environment](https://guix.gnu.org/manual/en/html_node/Invoking-guix-environment.html), which can handle installing dependencies for you even when you don't have root access on an Ubuntu system. - -1. **Set up and enter a container with the necessary dependencies.** After installing Guix as `~/opt/guix/bin/guix`, run: - -```sh -~/opt/guix/bin/guix environment -C guix --ad-hoc git python openssl python-pycurl nss-certs -``` - -2. **Install the tool.** From there you can follow the [user installation instructions](#installation-with-pip3---user). In brief: - -```sh -pip3 install --user git+https://github.com/arvados/bh20-seq-resource.git@master -``` +For running/developing the uploader with GNU Guix see [INSTALL.md](./doc/INSTALL.md) # Usage @@ -148,7 +136,7 @@ bh20-seq-uploader example/sequence.fasta example/metadata.json All these uploaded sequences are being fed into a workflow to generate a [pangenome](https://academic.oup.com/bib/article/19/1/118/2566735) for the virus. You can replicate this workflow yourself. -Get your SARS-CoV-2 sequences from GenBank in `seqs.fa`, and then run: +An example is to get your SARS-CoV-2 sequences from GenBank in `seqs.fa`, and then run a series of commands ```sh minimap2 -cx asm20 -X seqs.fa seqs.fa >seqs.paf @@ -157,6 +145,7 @@ odgi build -g seqs.gfa -s -o seqs.odgi odgi viz -i seqs.odgi -o seqs.png -x 4000 -y 500 -R -P 5 ``` -For more information on building pangenome models, [see this wiki page](https://github.com/virtual-biohackathons/covid-19-bh20/wiki/Pangenome#pangenome-model-from-available-genomes). - +Here we convert such a pipeline into the Common Workflow Language (CWL) and +sources can be found [here](https://github.com/hpobio-lab/viral-analysis/tree/master/cwl/pangenome-generate). +For more information on building pangenome models, [see this wiki page](https://github.com/virtual-biohackathons/covid-19-bh20/wiki/Pangenome#pangenome-model-from-available-genomes). diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 56cbe22..bf74ea5 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -6,7 +6,7 @@ import json import urllib.request import socket import getpass -from .qc_metadata import qc_metadata +import qc_metadata ARVADOS_API_HOST='lugli.arvadosapi.com' ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' diff --git a/doc/INSTALL.md b/doc/INSTALL.md new file mode 100644 index 0000000..c5c486c --- /dev/null +++ b/doc/INSTALL.md @@ -0,0 +1,31 @@ +# INSTALLATION + +Other options for running this tool. + +## GNU Guix + +Another way to install this tool is inside a [GNU Guix Environment](https://guix.gnu.org/manual/en/html_node/Invoking-guix-environment.html), which can handle installing dependencies for you even when you don't have root access on an Ubuntu system. + +1. **Set up and enter a container with the necessary dependencies.** After installing Guix as `~/opt/guix/bin/guix`, run: + +```sh +~/opt/guix/bin/guix environment -C guix --ad-hoc git python openssl python-pycurl nss-certs +``` + +2. **Install the tool.** From there you can follow the [user installation instructions](#installation-with-pip3---user). In brief: + +```sh +pip3 install --user schema-salad arvados-python-client +``` + +Pip installed the following modules + +``` +arvados-python-client-2.0.1 ciso8601-2.1.3 future-0.18.2 google-api-python-client-1.6.7 httplib2-0.17.1 oauth2client-4.1.3 pyasn1-0.4.8 pyasn1-modules-0.2.8 rsa-4.0 ruamel.yaml-0.15.77 six-1.14.0 uritemplate-3.0.1 ws4py-0.5.1 +``` + +3. Run the tool directly with + +```sh +~/opt/guix/bin/guix environment guix --ad-hoc git python openssl python-pycurl nss-certs -- python3 bh20sequploader/main.py +``` -- cgit v1.2.3 From bef2a43185f9494398f5d5a8cdb6c5f34352f912 Mon Sep 17 00:00:00 2001 From: Alex Kanitz Date: Fri, 10 Apr 2020 18:27:44 +0200 Subject: validate seq format with magic file --- bh20sequploader/main.py | 29 +++++++++++++++++++++++++---- bh20sequploader/validation/Makefile | 4 ++++ bh20sequploader/validation/formats | 4 ++++ bh20sequploader/validation/formats.mgc | Bin 0 -> 1032 bytes 4 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 bh20sequploader/validation/Makefile create mode 100644 bh20sequploader/validation/formats create mode 100644 bh20sequploader/validation/formats.mgc (limited to 'bh20sequploader/main.py') diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index bf74ea5..1d5b9c3 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -3,6 +3,8 @@ import time import arvados import arvados.collection import json +import magic +from pathlib import Path import urllib.request import socket import getpass @@ -14,7 +16,7 @@ UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa' def main(): parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis') - parser.add_argument('sequence', type=argparse.FileType('r'), help='sequence FASTA') + parser.add_argument('sequence', type=argparse.FileType('r'), help='sequence FASTA/FASTQ') parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json') args = parser.parse_args() @@ -26,10 +28,27 @@ def main(): col = arvados.collection.Collection(api_client=api) - if args.sequence.name.endswith("fasta") or args.sequence.name.endswith("fa"): - target = "sequence.fasta" - elif args.sequence.name.endswith("fastq") or args.sequence.name.endswith("fq"): + magic_file = Path(__file__).parent / "validation" / "formats.mgc" + val = magic.Magic(magic_file=magic_file.resolve().as_posix(), + uncompress=False, mime=True) + seq_type = val.from_file(args.sequence.name).lower() + print(f"Sequence type: {seq_type}") + if seq_type == "text/fasta": + # ensure that contains only one entry + entries = 0 + for line in args.sequence: + if line.startswith(">"): + entries += 1 + if entries > 1: + raise ValueError("FASTA file contains multiple entries") + break + args.sequence.close() + args.sequence = open(args.sequence.name, "r") target = "reads.fastq" + elif seq_type == "text/fastq": + target = "sequence.fasta" + else: + raise ValueError("Sequence file does not look like FASTA or FASTQ") with col.open(target, "w") as f: r = args.sequence.read(65536) @@ -37,6 +56,7 @@ def main(): while r: f.write(r) r = args.sequence.read(65536) + args.sequence.close() print("Reading metadata") with col.open("metadata.yaml", "w") as f: @@ -45,6 +65,7 @@ def main(): while r: f.write(r) r = args.metadata.read(65536) + args.metadata.close() external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') diff --git a/bh20sequploader/validation/Makefile b/bh20sequploader/validation/Makefile new file mode 100644 index 0000000..1ca13fb --- /dev/null +++ b/bh20sequploader/validation/Makefile @@ -0,0 +1,4 @@ +compile: formats.mgc + +formats.mgc : + file -C -m formats diff --git a/bh20sequploader/validation/formats b/bh20sequploader/validation/formats new file mode 100644 index 0000000..ac804cf --- /dev/null +++ b/bh20sequploader/validation/formats @@ -0,0 +1,4 @@ +0 regex \^\>.+\r?\n([acgtnACGTN]+\r?\n)*[acgtnACGTN]+(\r?\n)?$ FASTA +!:mime text/fasta +0 regex \^@.+\r?\n[acgtnACGTN]*\n\\+.*\n[!-i]*(\r\n)? FASTQ +!:mime text/fastq \ No newline at end of file diff --git a/bh20sequploader/validation/formats.mgc b/bh20sequploader/validation/formats.mgc new file mode 100644 index 0000000..bff282a Binary files /dev/null and b/bh20sequploader/validation/formats.mgc differ -- cgit v1.2.3 From 9b7fbc52ae229d72e75de9f433eea00ce37ba70a Mon Sep 17 00:00:00 2001 From: Pjotr Prins Date: Fri, 10 Apr 2020 11:52:18 -0500 Subject: Fixed TypeError: 'module' object is not callable --- bh20sequploader/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'bh20sequploader/main.py') diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index bf74ea5..ede9f38 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -6,7 +6,7 @@ import json import urllib.request import socket import getpass -import qc_metadata +from qc_metadata import qc_metadata ARVADOS_API_HOST='lugli.arvadosapi.com' ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' -- cgit v1.2.3 From a6ba9a5203a568611a94c043fd13e2ec50f071da Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 10 Apr 2020 15:12:08 -0400 Subject: Validate & propagate metadata --- bh20seqanalyzer/main.py | 45 ++++++++++++++++++++++++++++++++------ bh20sequploader/bh20seq-schema.yml | 6 ++--- bh20sequploader/main.py | 2 +- bh20sequploader/qc_metadata.py | 29 +++++++++++++++++++++++- 4 files changed, 70 insertions(+), 12 deletions(-) (limited to 'bh20sequploader/main.py') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 2030c1e..1fb51b5 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -8,6 +8,8 @@ import json import logging import ruamel.yaml from bh20sequploader.qc_metadata import qc_metadata +import pkg_resources +from schema_salad.sourceline import add_lc_filename logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO) @@ -24,8 +26,14 @@ def validate_upload(api, collection, validated_project, logging.warn("Upload '%s' missing metadata.yaml", collection["name"]) valid = False else: - metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) - #valid = qc_metadata(metadata_content) and valid + try: + metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) + metadata_content["id"] = "keep:%s/metadata.yaml" % collection["portable_data_hash"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) and valid + except Exception as e: + logging.warn(e) + valid = False if not valid: logging.warn("Failed metadata qc") @@ -52,9 +60,10 @@ def validate_upload(api, collection, validated_project, "owner_uuid": validated_project, "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute() else: + pass # It is invalid, delete it. - logging.warn("Deleting '%s'" % collection["name"]) - api.collections().delete(uuid=collection["uuid"]).execute() + #logging.warn("Deleting '%s'" % collection["name"]) + #api.collections().delete(uuid=collection["uuid"]).execute() return valid @@ -107,12 +116,17 @@ def start_fastq_to_fasta(api, collection, def start_pangenome_analysis(api, analysis_project, pangenome_workflow_uuid, - validated_project): + validated_project, + schema_ref): validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]]) inputobj = { "inputReads": [], "metadata": [], - "subjects": [] + "subjects": [], + "metadataSchema": { + "class": "File", + "location": schema_ref + } } for v in validated: inputobj["inputReads"].append({ @@ -166,12 +180,26 @@ def move_fastq_to_fasta_results(api, analysis_project, uploader_project): api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute() +def upload_schema(api, workflow_def_project): + schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml") + c = arvados.collection.Collection() + with c.open("schema.yml", "wb") as f: + f.write(schema_resource.read()) + pdh = c.portable_data_hash() + wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project], + ["portable_data_hash", "=", pdh]]).execute() + if len(wd["items"]) == 0: + c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True) + return "keep:%s/schema.yml" % pdh + + def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='') parser.add_argument('--pangenome-analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='') parser.add_argument('--fastq-project', type=str, default='lugli-j7d0g-xcjxp4oox2u1w8u', help='') parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='') + parser.add_argument('--workflow-def-project', type=str, default='lugli-j7d0g-5hswinmpyho8dju', help='') parser.add_argument('--pangenome-workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='') parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='') @@ -183,6 +211,8 @@ def main(): logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + schema_ref = upload_schema(api, args.workflow_def_project) + while True: move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) @@ -198,7 +228,8 @@ def main(): start_pangenome_analysis(api, args.pangenome_analysis_project, args.pangenome_workflow_uuid, - args.validated_project) + args.validated_project, + schema_ref) copy_most_recent_result(api, args.pangenome_analysis_project, diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml index cf9b015..a072bd7 100644 --- a/bh20sequploader/bh20seq-schema.yml +++ b/bh20sequploader/bh20seq-schema.yml @@ -174,9 +174,9 @@ $graph: jsonldPredicate: _id: "@id" #_type: "@id" - sequencefile: - doc: The subject (eg the fasta/fastq file) that this metadata describes + id: + doc: The subject (eg the fasta/fastq file) that the metadata describes type: string? jsonldPredicate: _id: "@id" - _type: "@id" \ No newline at end of file + _type: "@id" diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index bfb8c51..2032508 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -22,7 +22,7 @@ def main(): api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) - if not qc_metadata(args.metadata.name): + if not bh20sequploader.qc_metadata.qc_metadata(args.metadata.name): print("Failed metadata qc") exit(1) diff --git a/bh20sequploader/qc_metadata.py b/bh20sequploader/qc_metadata.py index ebe4dfc..38edcaa 100644 --- a/bh20sequploader/qc_metadata.py +++ b/bh20sequploader/qc_metadata.py @@ -1,12 +1,25 @@ import schema_salad.schema +import schema_salad.ref_resolver import logging import pkg_resources import logging +import traceback + +class CustomFetcher(schema_salad.ref_resolver.DefaultFetcher): + def check_exists(sup, url): + if url.startswith("keep:"): + return True + else: + return super().check_exists(url) + + def supported_schemes(self): # type: () -> List[str] + return ["file", "http", "https", "mailto", "keep"] + def qc_metadata(metadatafile): schema_resource = pkg_resources.resource_stream(__name__, "bh20seq-schema.yml") cache = {"https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml": schema_resource.read().decode("utf-8")} - (document_loader, + (loader, avsc_names, schema_metadata, metaschema_loader) = schema_salad.schema.load_schema("https://raw.githubusercontent.com/arvados/bh20-seq-resource/master/bh20sequploader/bh20seq-schema.yml", cache=cache) @@ -15,9 +28,23 @@ def qc_metadata(metadatafile): print(avsc_names) return False + document_loader = schema_salad.ref_resolver.Loader( + loader.ctx, + schemagraph=loader.graph, + foreign_properties=loader.foreign_properties, + idx=loader.idx, + cache=loader.cache, + fetcher_constructor=CustomFetcher, + skip_schemas=loader.skip_schemas, + url_fields=loader.url_fields, + allow_attachments=loader.allow_attachments, + session=loader.session, + ) + try: doc, metadata = schema_salad.schema.load_and_validate(document_loader, avsc_names, metadatafile, True) return True except Exception as e: + traceback.print_exc() logging.warn(e) return False -- cgit v1.2.3 From 925058d0b3db70803d322cc2a33801240899a20a Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Fri, 10 Apr 2020 15:52:37 -0400 Subject: Fix up fasta/fastq validation --- bh20seqanalyzer/main.py | 9 ++++++++- bh20sequploader/main.py | 29 +++++------------------------ setup.py | 5 +++-- 3 files changed, 16 insertions(+), 27 deletions(-) (limited to 'bh20sequploader/main.py') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 1fb51b5..c05b402 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -8,6 +8,7 @@ import json import logging import ruamel.yaml from bh20sequploader.qc_metadata import qc_metadata +from bh20sequploader.qc_fasta import qc_fasta import pkg_resources from schema_salad.sourceline import add_lc_filename @@ -38,7 +39,13 @@ def validate_upload(api, collection, validated_project, logging.warn("Failed metadata qc") if valid: - if "sequence.fasta" not in col: + if "sequence.fasta" in col: + try: + qc_fasta(col.open("sequence.fasta")) + except Exception as e: + logging.warn(e) + valid = False + else: if "reads.fastq" in col: start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid) return False diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 2032508..4a225f6 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -8,7 +8,8 @@ from pathlib import Path import urllib.request import socket import getpass -from qc_metadata import qc_metadata +from .qc_metadata import qc_metadata +from .qc_fasta import qc_fasta ARVADOS_API_HOST='lugli.arvadosapi.com' ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' @@ -22,34 +23,14 @@ def main(): api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) - if not bh20sequploader.qc_metadata.qc_metadata(args.metadata.name): + target = qc_fasta(args.sequence) + + if not qc_metadata(args.metadata.name): print("Failed metadata qc") exit(1) col = arvados.collection.Collection(api_client=api) - magic_file = Path(__file__).parent / "validation" / "formats.mgc" - val = magic.Magic(magic_file=magic_file.resolve().as_posix(), - uncompress=False, mime=True) - seq_type = val.from_file(args.sequence.name).lower() - print(f"Sequence type: {seq_type}") - if seq_type == "text/fasta": - # ensure that contains only one entry - entries = 0 - for line in args.sequence: - if line.startswith(">"): - entries += 1 - if entries > 1: - raise ValueError("FASTA file contains multiple entries") - break - args.sequence.close() - args.sequence = open(args.sequence.name, "r") - target = "reads.fastq" - elif seq_type == "text/fastq": - target = "sequence.fasta" - else: - raise ValueError("Sequence file does not look like FASTA or FASTQ") - with col.open(target, "w") as f: r = args.sequence.read(65536) print(r[0:20]) diff --git a/setup.py b/setup.py index 41ace7b..18e858e 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ try: except ImportError: tagger = egg_info_cmd.egg_info -install_requires = ["arvados-python-client", "schema-salad"] +install_requires = ["arvados-python-client", "schema-salad", "python-magic"] web_requires = ["flask", "pyyaml"] needs_pytest = {"pytest", "test", "ptr"}.intersection(sys.argv) @@ -31,7 +31,8 @@ setup( author_email="peter.amstutz@curii.com", license="Apache 2.0", packages=["bh20sequploader", "bh20seqanalyzer", "bh20simplewebuploader"], - package_data={"bh20sequploader": ["bh20seq-schema.yml"]}, + package_data={"bh20sequploader": ["bh20seq-schema.yml", "validation/formats"], + }, install_requires=install_requires, extras_require={ 'web': web_requires -- cgit v1.2.3