aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Amstutz2020-06-22 15:06:10 +0000
committerPeter Amstutz2020-06-22 15:06:10 +0000
commit7daa9ff2cdba742a811db00c924ccde25fa2c9b6 (patch)
treef6f001d224c82825c08b091f221f85897f60d40d
parent1554fb6b4daf263f034d46a5f5b26ebcc3e69d22 (diff)
downloadbh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.tar.gz
bh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.tar.lz
bh20-seq-resource-7daa9ff2cdba742a811db00c924ccde25fa2c9b6.zip
Handle upload & assembly of gzipped, paired-end fastq
-rw-r--r--bh20seqanalyzer/main.py48
-rw-r--r--bh20sequploader/main.py58
-rw-r--r--bh20sequploader/qc_fasta.py24
-rw-r--r--scripts/docker/Dockerfile4
4 files changed, 92 insertions, 42 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 31ad4c4..794ce27 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -40,20 +40,21 @@ def validate_upload(api, collection, validated_project,
if valid:
tgt = None
- for n in ("sequence.fasta", "reads.fastq"):
+ paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+ for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
if n not in col:
continue
- with col.open(n) as qf:
- tgt = qc_fasta(qf)
- if tgt != n:
+ with col.open(n, 'rb') as qf:
+ tgt = qc_fasta(qf)[0]
+ if tgt != n and tgt != paired.get(n):
logging.info("Expected %s but magic says it should be %s", n, tgt)
valid = False
- elif tgt == "reads.fastq":
- start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid)
+ elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n)
return False
if tgt is None:
valid = False
- logging.warn("Upload '%s' does not contain sequence.fasta or reads.fastq", collection["name"])
+ logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
["portable_data_hash", "=", col.portable_data_hash()]]).execute()
@@ -95,6 +96,7 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
tmp.name]
logging.info("Running %s" % ' '.join(cmd))
comp = subprocess.run(cmd, capture_output=True)
+ logging.info("Submitted %s", comp.stdout)
if comp.returncode != 0:
logging.error(comp.stderr.decode('utf-8'))
@@ -103,12 +105,10 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
def start_fastq_to_fasta(api, collection,
analysis_project,
- fastq_workflow_uuid):
- newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", {
- "fastq_forward": {
- "class": "File",
- "location": "keep:%s/reads.fastq" % collection["portable_data_hash"]
- },
+ fastq_workflow_uuid,
+ tgt):
+
+ params = {
"metadata": {
"class": "File",
"location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
@@ -117,7 +117,24 @@ def start_fastq_to_fasta(api, collection,
"class": "File",
"location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
}
- })
+ }
+
+ if tgt.startswith("reads.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
+ }
+ elif tgt.startswith("reads_1.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+ params["fastq_reverse"] = {
+ "class": "File",
+ "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+
+ newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params)
api.collections().update(uuid=collection["uuid"],
body={"owner_uuid": newproject["uuid"]}).execute()
@@ -222,6 +239,7 @@ def main():
parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='')
parser.add_argument('--kickoff', action="store_true")
+ parser.add_argument('--once', action="store_true")
args = parser.parse_args()
api = arvados.api()
@@ -265,4 +283,6 @@ def main():
args.pangenome_analysis_project,
args.latest_result_collection)
+ if args.once:
+ break
time.sleep(15)
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index a2e62fa..c442af0 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -22,18 +22,10 @@ ARVADOS_API_HOST='lugli.arvadosapi.com'
ARVADOS_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462'
UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa'
-def main():
- parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
- parser.add_argument('sequence', type=argparse.FileType('r'), help='sequence FASTA/FASTQ')
- parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
- parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
- args = parser.parse_args()
-
- api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True)
-
+def qa_stuff(metadata, sequence_p1, sequence_p2):
try:
log.debug("Checking metadata")
- if not qc_metadata(args.metadata.name):
+ if not qc_metadata(metadata.name):
log.warning("Failed metadata qc")
exit(1)
except ValueError as e:
@@ -42,29 +34,52 @@ def main():
print(e)
exit(1)
+ target = []
try:
- log.debug("Checking FASTA QC")
- target = qc_fasta(args.sequence)
+ log.debug("Checking FASTA/FASTQ QC")
+ target.append(qc_fasta(sequence_p1))
+ if sequence_p2:
+ target.append(qc_fasta(sequence_p2))
+ target[0] = ("reads_1."+target[0][0][6:], target[0][1])
+ target[1] = ("reads_2."+target[1][0][6:], target[0][1])
except ValueError as e:
log.debug(e)
log.debug("Failed FASTA qc")
print(e)
exit(1)
+ return target
+
+def upload_sequence(col, target, sequence):
+ with col.open(target[0], "wb") as f:
+ r = sequence.read(65536)
+ while r:
+ f.write(r)
+ r = sequence.read(65536)
+
+
+def main():
+ parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
+ parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
+ parser.add_argument('sequence_p1', type=argparse.FileType('rb'), help='sequence FASTA/FASTQ')
+ parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, help='sequence FASTQ pair')
+ parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
+ args = parser.parse_args()
+
+ api = arvados.api(host=ARVADOS_API_HOST, token=ARVADOS_API_TOKEN, insecure=True)
+
+ target = qa_stuff(args.metadata, args.sequence_p1, args.sequence_p2)
+ seqlabel = target[0][1]
+
if args.validate:
print("Valid")
exit(0)
col = arvados.collection.Collection(api_client=api)
- with col.open(target, "w") as f:
- r = args.sequence.read(65536)
- seqlabel = r[1:r.index("\n")]
- print(seqlabel)
- while r:
- f.write(r)
- r = args.sequence.read(65536)
- args.sequence.close()
+ upload_sequence(col, target[0], args.sequence_p1)
+ if args.sequence_p2:
+ upload_sequence(col, target[1], args.sequence_p2)
print("Reading metadata")
with col.open("metadata.yaml", "w") as f:
@@ -73,7 +88,6 @@ def main():
while r:
f.write(r)
r = args.metadata.read(65536)
- args.metadata.close()
external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')
@@ -93,6 +107,8 @@ def main():
(seqlabel, properties['upload_user'], properties['upload_ip']),
properties=properties, ensure_unique_name=True)
+ print("Saved to %s" % col.manifest_locator())
+
print("Done")
if __name__ == "__main__":
diff --git a/bh20sequploader/qc_fasta.py b/bh20sequploader/qc_fasta.py
index 5c8cf3a..607c8c0 100644
--- a/bh20sequploader/qc_fasta.py
+++ b/bh20sequploader/qc_fasta.py
@@ -5,6 +5,8 @@ import subprocess
import tempfile
import logging
import re
+import io
+import gzip
log = logging.getLogger(__name__ )
@@ -23,7 +25,7 @@ def read_fasta(sequence):
raise ValueError("FASTA file contains multiple entries")
return label, bases
-def qc_fasta(sequence):
+def qc_fasta(arg_sequence):
log.debug("Starting qc_fasta")
schema_resource = pkg_resources.resource_stream(__name__, "validation/formats")
with tempfile.NamedTemporaryFile() as tmp:
@@ -31,12 +33,24 @@ def qc_fasta(sequence):
tmp.flush()
val = magic.Magic(magic_file=tmp.name,
uncompress=False, mime=True)
- seq_type = val.from_buffer(sequence.read(4096)).lower()
+
+ gz = ""
+ if arg_sequence.name.endswith(".gz"):
+ sequence = gzip.GzipFile(fileobj=arg_sequence, mode='rb')
+ gz = ".gz"
+ else:
+ sequence = arg_sequence
+
+ sequence = io.TextIOWrapper(sequence)
+ r = sequence.read(4096)
sequence.seek(0)
+
+ seqlabel = r[1:r.index("\n")]
+ seq_type = val.from_buffer(r).lower()
+
if seq_type == "text/fasta":
# ensure that contains only one entry
submitlabel, submitseq = read_fasta(sequence)
- sequence.seek(0)
with tempfile.NamedTemporaryFile() as tmp1:
refstring = pkg_resources.resource_string(__name__, "SARS-CoV-2-reference.fasta")
@@ -71,8 +85,8 @@ def qc_fasta(sequence):
if similarity < 70.0:
raise ValueError("QC fail: submit similarity is less than 70%")
- return "sequence.fasta"
+ return ("sequence.fasta"+gz, seqlabel)
elif seq_type == "text/fastq":
- return "reads.fastq"
+ return ("reads.fastq"+gz, seqlabel)
else:
raise ValueError("Sequence file does not look like a DNA FASTA or FASTQ")
diff --git a/scripts/docker/Dockerfile b/scripts/docker/Dockerfile
index 5bd38dd..9fb33d5 100644
--- a/scripts/docker/Dockerfile
+++ b/scripts/docker/Dockerfile
@@ -4,7 +4,7 @@ RUN apt-get update && \
apt-get -yq --no-install-recommends -o Acquire::Retries=6 install \
python3 python3-pip python3-setuptools python3-dev python-pycurl \
clustalw python3-biopython libcurl4-openssl-dev build-essential \
- libssl-dev && \
+ libssl-dev libmagic-dev python3-magic && \
apt-get clean
-RUN pip3 install bh20-seq-uploader \ No newline at end of file
+RUN pip3 install bh20-seq-uploader