From 7daa9ff2cdba742a811db00c924ccde25fa2c9b6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Mon, 22 Jun 2020 15:06:10 +0000 Subject: Handle upload & assembly of gzipped, paired-end fastq --- bh20seqanalyzer/main.py | 48 ++++++++++++++++++++++++++----------- bh20sequploader/main.py | 58 +++++++++++++++++++++++++++++---------------- bh20sequploader/qc_fasta.py | 24 +++++++++++++++---- scripts/docker/Dockerfile | 4 ++-- 4 files changed, 92 insertions(+), 42 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 31ad4c4..794ce27 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -40,20 +40,21 @@ def validate_upload(api, collection, validated_project, if valid: tgt = None - for n in ("sequence.fasta", "reads.fastq"): + paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} + for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): if n not in col: continue - with col.open(n) as qf: - tgt = qc_fasta(qf) - if tgt != n: + with col.open(n, 'rb') as qf: + tgt = qc_fasta(qf)[0] + if tgt != n and tgt != paired.get(n): logging.info("Expected %s but magic says it should be %s", n, tgt) valid = False - elif tgt == "reads.fastq": - start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid) + elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n) return False if tgt is None: valid = False - logging.warn("Upload '%s' does not contain sequence.fasta or reads.fastq", collection["name"]) + logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], ["portable_data_hash", "=", col.portable_data_hash()]]).execute() @@ -95,6 +96,7 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj): tmp.name] logging.info("Running %s" % ' '.join(cmd)) comp = subprocess.run(cmd, capture_output=True) + logging.info("Submitted %s", comp.stdout) if comp.returncode != 0: logging.error(comp.stderr.decode('utf-8')) @@ -103,12 +105,10 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj): def start_fastq_to_fasta(api, collection, analysis_project, - fastq_workflow_uuid): - newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", { - "fastq_forward": { - "class": "File", - "location": "keep:%s/reads.fastq" % collection["portable_data_hash"] - }, + fastq_workflow_uuid, + tgt): + + params = { "metadata": { "class": "File", "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"] @@ -117,7 +117,24 @@ def start_fastq_to_fasta(api, collection, "class": "File", "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta" } - }) + } + + if tgt.startswith("reads.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt) + } + elif tgt.startswith("reads_1.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:]) + } + params["fastq_reverse"] = { + "class": "File", + "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:]) + } + + newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params) api.collections().update(uuid=collection["uuid"], body={"owner_uuid": newproject["uuid"]}).execute() @@ -222,6 +239,7 @@ def main(): parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') parser.add_argument('--kickoff', action="store_true") + parser.add_argument('--once', action="store_true") args = parser.parse_args() api = arvados.api() @@ -265,4 +283,6 @@ def main(): args.pangenome_analysis_project, args.latest_result_collection) + if args.once: + break time.sleep(15) diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index a2e62fa..c442af0 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -22,18 +22,10 @@ ARVADOS_API_HOST='lugli.arvadosapi.com' ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462' UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa' -def main(): - parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis') - parser.add_argument('sequence', type=argparse.FileType('r'), help='sequence FASTA/FASTQ') - parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json') - parser.add_argument("--validate", action="store_true", help="Dry run, validate only") - args = parser.parse_args() - - api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) - +def qa_stuff(metadata, sequence_p1, sequence_p2): try: log.debug("Checking metadata") - if not qc_metadata(args.metadata.name): + if not qc_metadata(metadata.name): log.warning("Failed metadata qc") exit(1) except ValueError as e: @@ -42,29 +34,52 @@ def main(): print(e) exit(1) + target = [] try: - log.debug("Checking FASTA QC") - target = qc_fasta(args.sequence) + log.debug("Checking FASTA/FASTQ QC") + target.append(qc_fasta(sequence_p1)) + if sequence_p2: + target.append(qc_fasta(sequence_p2)) + target[0] = ("reads_1."+target[0][0][6:], target[0][1]) + target[1] = ("reads_2."+target[1][0][6:], target[0][1]) except ValueError as e: log.debug(e) log.debug("Failed FASTA qc") print(e) exit(1) + return target + +def upload_sequence(col, target, sequence): + with col.open(target[0], "wb") as f: + r = sequence.read(65536) + while r: + f.write(r) + r = sequence.read(65536) + + +def main(): + parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis') + parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json') + parser.add_argument('sequence_p1', type=argparse.FileType('rb'), help='sequence FASTA/FASTQ') + parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, help='sequence FASTQ pair') + parser.add_argument("--validate", action="store_true", help="Dry run, validate only") + args = parser.parse_args() + + api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True) + + target = qa_stuff(args.metadata, args.sequence_p1, args.sequence_p2) + seqlabel = target[0][1] + if args.validate: print("Valid") exit(0) col = arvados.collection.Collection(api_client=api) - with col.open(target, "w") as f: - r = args.sequence.read(65536) - seqlabel = r[1:r.index("\n")] - print(seqlabel) - while r: - f.write(r) - r = args.sequence.read(65536) - args.sequence.close() + upload_sequence(col, target[0], args.sequence_p1) + if args.sequence_p2: + upload_sequence(col, target[1], args.sequence_p2) print("Reading metadata") with col.open("metadata.yaml", "w") as f: @@ -73,7 +88,6 @@ def main(): while r: f.write(r) r = args.metadata.read(65536) - args.metadata.close() external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') @@ -93,6 +107,8 @@ def main(): (seqlabel, properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) + print("Saved to %s" % col.manifest_locator()) + print("Done") if __name__ == "__main__": diff --git a/bh20sequploader/qc_fasta.py b/bh20sequploader/qc_fasta.py index 5c8cf3a..607c8c0 100644 --- a/bh20sequploader/qc_fasta.py +++ b/bh20sequploader/qc_fasta.py @@ -5,6 +5,8 @@ import subprocess import tempfile import logging import re +import io +import gzip log = logging.getLogger(__name__ ) @@ -23,7 +25,7 @@ def read_fasta(sequence): raise ValueError("FASTA file contains multiple entries") return label, bases -def qc_fasta(sequence): +def qc_fasta(arg_sequence): log.debug("Starting qc_fasta") schema_resource = pkg_resources.resource_stream(__name__, "validation/formats") with tempfile.NamedTemporaryFile() as tmp: @@ -31,12 +33,24 @@ def qc_fasta(sequence): tmp.flush() val = magic.Magic(magic_file=tmp.name, uncompress=False, mime=True) - seq_type = val.from_buffer(sequence.read(4096)).lower() + + gz = "" + if arg_sequence.name.endswith(".gz"): + sequence = gzip.GzipFile(fileobj=arg_sequence, mode='rb') + gz = ".gz" + else: + sequence = arg_sequence + + sequence = io.TextIOWrapper(sequence) + r = sequence.read(4096) sequence.seek(0) + + seqlabel = r[1:r.index("\n")] + seq_type = val.from_buffer(r).lower() + if seq_type == "text/fasta": # ensure that contains only one entry submitlabel, submitseq = read_fasta(sequence) - sequence.seek(0) with tempfile.NamedTemporaryFile() as tmp1: refstring = pkg_resources.resource_string(__name__, "SARS-CoV-2-reference.fasta") @@ -71,8 +85,8 @@ def qc_fasta(sequence): if similarity < 70.0: raise ValueError("QC fail: submit similarity is less than 70%") - return "sequence.fasta" + return ("sequence.fasta"+gz, seqlabel) elif seq_type == "text/fastq": - return "reads.fastq" + return ("reads.fastq"+gz, seqlabel) else: raise ValueError("Sequence file does not look like a DNA FASTA or FASTQ") diff --git a/scripts/docker/Dockerfile b/scripts/docker/Dockerfile index 5bd38dd..9fb33d5 100644 --- a/scripts/docker/Dockerfile +++ b/scripts/docker/Dockerfile @@ -4,7 +4,7 @@ RUN apt-get update && \ apt-get -yq --no-install-recommends -o Acquire::Retries=6 install \ python3 python3-pip python3-setuptools python3-dev python-pycurl \ clustalw python3-biopython libcurl4-openssl-dev build-essential \ - libssl-dev && \ + libssl-dev libmagic-dev python3-magic && \ apt-get clean -RUN pip3 install bh20-seq-uploader \ No newline at end of file +RUN pip3 install bh20-seq-uploader -- cgit v1.2.3