about summary refs log tree commit diff
diff options
context:
space:
mode:
authorPeter Amstutz2020-06-22 15:06:10 +0000
committerPeter Amstutz2020-06-22 15:06:10 +0000
commit7daa9ff2cdba742a811db00c924ccde25fa2c9b6 (patch)
treef6f001d224c82825c08b091f221f85897f60d40d
parent1554fb6b4daf263f034d46a5f5b26ebcc3e69d22 (diff)
downloadbh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.tar.gz
bh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.tar.lz
bh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.zip
Handle upload & assembly of gzipped, paired-end fastq
-rw-r--r--bh20seqanalyzer/main.py48
-rw-r--r--bh20sequploader/main.py58
-rw-r--r--bh20sequploader/qc_fasta.py24
-rw-r--r--scripts/docker/Dockerfile4
4 files changed, 92 insertions, 42 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 31ad4c4..794ce27 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -40,20 +40,21 @@ def validate_upload(api, collection, validated_project,
 
     if valid:
         tgt = None
-        for n in ("sequence.fasta", "reads.fastq"):
+        paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+        for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
             if n not in col:
                 continue
-            with col.open(n) as qf:
-                tgt = qc_fasta(qf)
-                if tgt != n:
+            with col.open(n, 'rb') as qf:
+                tgt = qc_fasta(qf)[0]
+                if tgt != n and tgt != paired.get(n):
                     logging.info("Expected %s but magic says it should be %s", n, tgt)
                     valid = False
-                elif tgt == "reads.fastq":
-                    start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid)
+                elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+                    start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n)
                     return False
         if tgt is None:
             valid = False
-            logging.warn("Upload '%s' does not contain sequence.fasta or reads.fastq", collection["name"])
+            logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
 
     dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
                                           ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
@@ -95,6 +96,7 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
                tmp.name]
         logging.info("Running %s" % ' '.join(cmd))
         comp = subprocess.run(cmd, capture_output=True)
+    logging.info("Submitted %s", comp.stdout)
     if comp.returncode != 0:
         logging.error(comp.stderr.decode('utf-8'))
 
@@ -103,12 +105,10 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
 
 def start_fastq_to_fasta(api, collection,
                          analysis_project,
-                         fastq_workflow_uuid):
-    newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", {
-        "fastq_forward": {
-            "class": "File",
-            "location": "keep:%s/reads.fastq" % collection["portable_data_hash"]
-        },
+                         fastq_workflow_uuid,
+                         tgt):
+
+    params = {
         "metadata": {
             "class": "File",
             "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
@@ -117,7 +117,24 @@ def start_fastq_to_fasta(api, collection,
             "class": "File",
             "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
         }
-    })
+    }
+
+    if tgt.startswith("reads.fastq"):
+        params["fastq_forward"] = {
+            "class": "File",
+            "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
+        }
+    elif tgt.startswith("reads_1.fastq"):
+        params["fastq_forward"] = {
+            "class": "File",
+            "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
+        }
+        params["fastq_reverse"] = {
+            "class": "File",
+            "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+        }
+
+    newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params)
     api.collections().update(uuid=collection["uuid"],
                              body={"owner_uuid": newproject["uuid"]}).execute()
 
@@ -222,6 +239,7 @@ def main():
 
     parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='')
     parser.add_argument('--kickoff', action="store_true")
+    parser.add_argument('--once', action="store_true")
     args = parser.parse_args()
 
     api = arvados.api()
@@ -265,4 +283,6 @@ def main():
                                 args.pangenome_analysis_project,
                                 args.latest_result_collection)
 
+        if args.once:
+            break
         time.sleep(15)
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index a2e62fa..c442af0 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -22,18 +22,10 @@ ARVADOS_API_HOST='lugli.arvadosapi.com'
 ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462'
 UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa'
 
-def main():
-    parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
-    parser.add_argument('sequence', type=argparse.FileType('r'), help='sequence FASTA/FASTQ')
-    parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
-    parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
-    args = parser.parse_args()
-
-    api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True)
-
+def qa_stuff(metadata, sequence_p1, sequence_p2):
     try:
         log.debug("Checking metadata")
-        if not qc_metadata(args.metadata.name):
+        if not qc_metadata(metadata.name):
             log.warning("Failed metadata qc")
             exit(1)
     except ValueError as e:
@@ -42,29 +34,52 @@ def main():
         print(e)
         exit(1)
 
+    target = []
     try:
-        log.debug("Checking FASTA QC")
-        target = qc_fasta(args.sequence)
+        log.debug("Checking FASTA/FASTQ QC")
+        target.append(qc_fasta(sequence_p1))
+        if sequence_p2:
+            target.append(qc_fasta(sequence_p2))
+            target[0] = ("reads_1."+target[0][0][6:], target[0][1])
+            target[1] = ("reads_2."+target[1][0][6:], target[0][1])
     except ValueError as e:
         log.debug(e)
         log.debug("Failed FASTA qc")
         print(e)
         exit(1)
 
+    return target
+
+def upload_sequence(col, target, sequence):
+    with col.open(target[0], "wb") as f:
+        r = sequence.read(65536)
+        while r:
+            f.write(r)
+            r = sequence.read(65536)
+
+
+def main():
+    parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
+    parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
+    parser.add_argument('sequence_p1', type=argparse.FileType('rb'), help='sequence FASTA/FASTQ')
+    parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, help='sequence FASTQ pair')
+    parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
+    args = parser.parse_args()
+
+    api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True)
+
+    target = qa_stuff(args.metadata, args.sequence_p1, args.sequence_p2)
+    seqlabel = target[0][1]
+
     if args.validate:
         print("Valid")
         exit(0)
 
     col = arvados.collection.Collection(api_client=api)
 
-    with col.open(target, "w") as f:
-        r = args.sequence.read(65536)
-        seqlabel = r[1:r.index("\n")]
-        print(seqlabel)
-        while r:
-            f.write(r)
-            r = args.sequence.read(65536)
-    args.sequence.close()
+    upload_sequence(col, target[0], args.sequence_p1)
+    if args.sequence_p2:
+        upload_sequence(col, target[1], args.sequence_p2)
 
     print("Reading metadata")
     with col.open("metadata.yaml", "w") as f:
@@ -73,7 +88,6 @@ def main():
         while r:
             f.write(r)
             r = args.metadata.read(65536)
-    args.metadata.close()
 
     external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')
 
@@ -93,6 +107,8 @@ def main():
                  (seqlabel, properties['upload_user'], properties['upload_ip']),
                  properties=properties, ensure_unique_name=True)
 
+    print("Saved to %s" % col.manifest_locator())
+
     print("Done")
 
 if __name__ == "__main__":
diff --git a/bh20sequploader/qc_fasta.py b/bh20sequploader/qc_fasta.py
index 5c8cf3a..607c8c0 100644
--- a/bh20sequploader/qc_fasta.py
+++ b/bh20sequploader/qc_fasta.py
@@ -5,6 +5,8 @@ import subprocess
 import tempfile
 import logging
 import re
+import io
+import gzip
 
 log = logging.getLogger(__name__ )
 
@@ -23,7 +25,7 @@ def read_fasta(sequence):
             raise ValueError("FASTA file contains multiple entries")
     return label, bases
 
-def qc_fasta(sequence):
+def qc_fasta(arg_sequence):
     log.debug("Starting qc_fasta")
     schema_resource = pkg_resources.resource_stream(__name__, "validation/formats")
     with tempfile.NamedTemporaryFile() as tmp:
@@ -31,12 +33,24 @@ def qc_fasta(sequence):
         tmp.flush()
         val = magic.Magic(magic_file=tmp.name,
                           uncompress=False, mime=True)
-    seq_type = val.from_buffer(sequence.read(4096)).lower()
+
+    gz = ""
+    if arg_sequence.name.endswith(".gz"):
+        sequence = gzip.GzipFile(fileobj=arg_sequence, mode='rb')
+        gz = ".gz"
+    else:
+        sequence = arg_sequence
+
+    sequence = io.TextIOWrapper(sequence)
+    r = sequence.read(4096)
     sequence.seek(0)
+
+    seqlabel = r[1:r.index("\n")]
+    seq_type = val.from_buffer(r).lower()
+
     if seq_type == "text/fasta":
         # ensure that contains only one entry
         submitlabel, submitseq = read_fasta(sequence)
-        sequence.seek(0)
 
         with tempfile.NamedTemporaryFile() as tmp1:
             refstring = pkg_resources.resource_string(__name__, "SARS-CoV-2-reference.fasta")
@@ -71,8 +85,8 @@ def qc_fasta(sequence):
             if similarity < 70.0:
                 raise ValueError("QC fail: submit similarity is less than 70%")
 
-        return "sequence.fasta"
+        return ("sequence.fasta"+gz, seqlabel)
     elif seq_type == "text/fastq":
-        return "reads.fastq"
+        return ("reads.fastq"+gz, seqlabel)
     else:
         raise ValueError("Sequence file does not look like a DNA FASTA or FASTQ")
diff --git a/scripts/docker/Dockerfile b/scripts/docker/Dockerfile
index 5bd38dd..9fb33d5 100644
--- a/scripts/docker/Dockerfile
+++ b/scripts/docker/Dockerfile
@@ -4,7 +4,7 @@ RUN apt-get update && \
     apt-get -yq --no-install-recommends -o Acquire::Retries=6 install \
     python3 python3-pip python3-setuptools python3-dev python-pycurl \
     clustalw python3-biopython libcurl4-openssl-dev build-essential \
-    libssl-dev && \
+    libssl-dev libmagic-dev python3-magic && \
     apt-get clean
 
-RUN pip3 install bh20-seq-uploader
\ No newline at end of file
+RUN pip3 install bh20-seq-uploader