From 88d81f853cf04b7f28681dd9cdee775b0422f252 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 12:53:19 -0400 Subject: Working on NCBI import Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20sequploader/bh20seq-schema.yml | 4 ++-- bh20sequploader/main.py | 7 ++++--- scripts/foreach.sh | 18 ++++++++++++++++++ scripts/from_genbank_to_fasta_and_yaml.py | 26 ++++++++++++++------------ 4 files changed, 38 insertions(+), 17 deletions(-) create mode 100755 scripts/foreach.sh mode change 100644 => 100755 scripts/from_genbank_to_fasta_and_yaml.py diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml index 75308ab..ebca35b 100644 --- a/bh20sequploader/bh20seq-schema.yml +++ b/bh20sequploader/bh20seq-schema.yml @@ -162,12 +162,12 @@ $graph: _id: http://www.ebi.ac.uk/efo/EFO_0002699 sequencing_coverage: doc: Sequence coverage defined as the average number of reads representing a given nucleotide (e.g. 100x) - type: float? + type: ["null", float, int] jsonldPredicate: _id: http://purl.obolibrary.org/obo/FLU_0000848 sequencing_coverage2: doc: If a second sequence technology was used you can submit its coverage here - type: float? + type: ["null", float, int] jsonldPredicate: _id: http://purl.obolibrary.org/obo/FLU_0000848 additional_technology_information: diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 49d012d..2fda347 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -44,7 +44,8 @@ def main(): with col.open(target, "w") as f: r = args.sequence.read(65536) - print(r[0:20]) + seqlabel = r[1:r.index("\n")] + print(seqlabel) while r: f.write(r) r = args.sequence.read(65536) @@ -67,8 +68,8 @@ def main(): "upload_user": "%s@%s" % (getpass.getuser(), socket.gethostname()) } - col.save_new(owner_uuid=UPLOAD_PROJECT, name="Uploaded by %s from %s" % - (properties['upload_user'], properties['upload_ip']), + col.save_new(owner_uuid=UPLOAD_PROJECT, name="%s uploaded by %s from %s" % + (seqlabel, properties['upload_user'], properties['upload_ip']), properties=properties, ensure_unique_name=True) print("Done") diff --git a/scripts/foreach.sh b/scripts/foreach.sh new file mode 100755 index 0000000..35b07b8 --- /dev/null +++ b/scripts/foreach.sh @@ -0,0 +1,18 @@ +#!/bin/sh +rm -rf validated fasta_and_yaml_* +mkdir -p validated +./from_genbank_to_fasta_and_yaml.py +fasta_files=$(find fasta_and_yaml_20200421/ -name "*.fasta") +for f in $fasta_files ; do + yaml=$(echo $f | rev | cut -c7- | rev).yaml + echo $f + echo $yaml + if bh20-seq-uploader --validate $f $yaml ; then + sz=$(stat --format=%s $f) + if test $sz -gt 20000 ; then + mv $f $yaml validated + else + echo "Fasta file too small" + fi + fi +done diff --git a/scripts/from_genbank_to_fasta_and_yaml.py b/scripts/from_genbank_to_fasta_and_yaml.py old mode 100644 new mode 100755 index 7e7c089..1a12513 --- a/scripts/from_genbank_to_fasta_and_yaml.py +++ b/scripts/from_genbank_to_fasta_and_yaml.py @@ -1,8 +1,10 @@ +#!/usr/bin/env python3 + from Bio import Entrez Entrez.email = 'another_email@gmail.com' import xml.etree.ElementTree as ET -import yaml +import json import os from datetime import date @@ -29,7 +31,7 @@ for term in term_list: # Remove the version in the id tmp_list = [x.split('.')[0] for x in tmp_list] - + print(term, len(tmp_list)) tmp_list=tmp_list # tmp_list = tmp_list[0:2] # restricting to small run @@ -49,11 +51,11 @@ print(term_list + ['NCBI Virus'], len(id_set)) def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n] - + num_ids_for_request = 100 if not os.path.exists(dir_metadata_today): os.makedirs(dir_metadata_today) - + for i, id_x_list in enumerate(chunks(list(id_set), num_ids_for_request)): path_metadata_xxx_xml = os.path.join(dir_metadata_today, 'metadata_{}.xml'.format(i)) print('Requesting {} ids --> {}'.format(len(id_x_list), path_metadata_xxx_xml)) @@ -63,7 +65,7 @@ if not os.path.exists(dir_metadata_today): Entrez.efetch(db='nuccore', id=id_x_list, retmode='xml').read() ) - + term_to_uri_dict = {} for path_dict_xxx_csv in [os.path.join(dir_dict_ontology_standardization, name_xxx_csv) for name_xxx_csv in os.listdir(dir_dict_ontology_standardization) if name_xxx_csv.endswith('.csv')]: @@ -74,7 +76,7 @@ for path_dict_xxx_csv in [os.path.join(dir_dict_ontology_standardization, name_x if len(line.split(',')) > 2: term, uri = line.strip('\n').split('",') term = term.strip('"') - else: + else: term, uri = line.strip('\n').split(',') term_to_uri_dict[term] = uri @@ -125,7 +127,7 @@ if not os.path.exists(dir_fasta_and_yaml_today): ): if info_to_check in GBSeq_comment_text: tech_info_to_parse = GBSeq_comment_text.split('{} :: '.format(info_to_check))[1].split(' ;')[0] - + if field_in_yaml == 'sequencing_coverage': # A regular expression would be better! info_for_yaml_dict['technology'][field_in_yaml] = ';'.join( @@ -139,7 +141,7 @@ if not os.path.exists(dir_fasta_and_yaml_today): seq_tec = term_to_uri_dict[seq_tec] else: print(accession_version, 'missing technologies:', seq_tec) - + new_seq_tec_list.append(seq_tec) for n, seq_tec in enumerate(new_seq_tec_list): @@ -147,7 +149,7 @@ if not os.path.exists(dir_fasta_and_yaml_today): else: info_for_yaml_dict['technology'][field_in_yaml] = tech_info_to_parse - + #term_to_uri_dict for GBFeature in GBSeq.iter('GBFeature'): @@ -211,12 +213,12 @@ if not os.path.exists(dir_fasta_and_yaml_today): info_for_yaml_dict['virus']['virus_species'] = "http://purl.obolibrary.org/obo/NCBITaxon_"+GBQualifier_value_text.split('taxon:')[1] - #Remove technology key if empty! + # Remove technology key if empty! if (info_for_yaml_dict['technology']=={}): - del info_for_yaml_dict['key'] + del info_for_yaml_dict['technology'] with open(os.path.join(dir_fasta_and_yaml_today, '{}.fasta'.format(accession_version)), 'w') as fw: fw.write('>{}\n{}'.format(accession_version, GBSeq_sequence.text.upper())) with open(os.path.join(dir_fasta_and_yaml_today, '{}.yaml'.format(accession_version)), 'w') as fw: - yaml.dump(info_for_yaml_dict, fw, default_flow_style=False) + json.dump(info_for_yaml_dict, fw, indent=2) -- cgit v1.2.3 From 7e085b2958d9bd4f0a2b1912cf259a05b56366bc Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 13:22:53 -0400 Subject: Tweak handling of "coverage" also fix typo Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20sequploader/bh20seq-schema.yml | 4 ++-- bh20sequploader/bh20seq-shex.rdf | 2 +- scripts/dict_ontology_standardization/ncbi_speciesman_source.csv | 2 +- scripts/from_genbank_to_fasta_and_yaml.py | 9 ++++++--- 4 files changed, 10 insertions(+), 7 deletions(-) diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml index ebca35b..75308ab 100644 --- a/bh20sequploader/bh20seq-schema.yml +++ b/bh20sequploader/bh20seq-schema.yml @@ -162,12 +162,12 @@ $graph: _id: http://www.ebi.ac.uk/efo/EFO_0002699 sequencing_coverage: doc: Sequence coverage defined as the average number of reads representing a given nucleotide (e.g. 100x) - type: ["null", float, int] + type: float? jsonldPredicate: _id: http://purl.obolibrary.org/obo/FLU_0000848 sequencing_coverage2: doc: If a second sequence technology was used you can submit its coverage here - type: ["null", float, int] + type: float? jsonldPredicate: _id: http://purl.obolibrary.org/obo/FLU_0000848 additional_technology_information: diff --git a/bh20sequploader/bh20seq-shex.rdf b/bh20sequploader/bh20seq-shex.rdf index 59ee71b..31e714f 100644 --- a/bh20sequploader/bh20seq-shex.rdf +++ b/bh20sequploader/bh20seq-shex.rdf @@ -50,7 +50,7 @@ PREFIX wikidata: :technologyShape { obo:OBI_0600047 IRI {0,2} ; - obo:FLU_0000848 xsd:integer ?; + obo:FLU_0000848 xsd:double ?; efo:EFO_0002699 xsd:string ?; } diff --git a/scripts/dict_ontology_standardization/ncbi_speciesman_source.csv b/scripts/dict_ontology_standardization/ncbi_speciesman_source.csv index 2905588..909cf37 100644 --- a/scripts/dict_ontology_standardization/ncbi_speciesman_source.csv +++ b/scripts/dict_ontology_standardization/ncbi_speciesman_source.csv @@ -1,4 +1,4 @@ -nasopharyngeal swab, http://purl.obolibrary.org/obo/NCIT_C155831 +nasopharyngeal swab,http://purl.obolibrary.org/obo/NCIT_C155831 nasopharyngeal exudate,http://purl.obolibrary.org/obo/NCIT_C155831 respiratory swab,http://purl.obolibrary.org/obo/NCIT_C155831 naso-pharyngeal exudate,http://purl.obolibrary.org/obo/NCIT_C155831 diff --git a/scripts/from_genbank_to_fasta_and_yaml.py b/scripts/from_genbank_to_fasta_and_yaml.py index 1a12513..00c0012 100755 --- a/scripts/from_genbank_to_fasta_and_yaml.py +++ b/scripts/from_genbank_to_fasta_and_yaml.py @@ -130,9 +130,12 @@ if not os.path.exists(dir_fasta_and_yaml_today): if field_in_yaml == 'sequencing_coverage': # A regular expression would be better! - info_for_yaml_dict['technology'][field_in_yaml] = ';'.join( - [x.strip('(average)').strip("reads/nt").replace(',', '.').strip(' xX>') for x in tech_info_to_parse.split(';')] - ) + try: + info_for_yaml_dict['technology'][field_in_yaml] = float( + tech_info_to_parse.strip('(average)').strip("reads/nt").replace(',', '.').strip(' xX>')) + except ValueError: + print(accession_version, "Couldn't make sense of Coverage '%s'" % tech_info_to_parse) + pass elif field_in_yaml == 'sample_sequencing_technology': new_seq_tec_list = [] for seq_tec in tech_info_to_parse.split(';'): -- cgit v1.2.3 From cad23032ecf6ef325aab2978d5df36609ad50088 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 18:16:47 +0000 Subject: add noLinkCheck to specimen_source2 --- bh20sequploader/bh20seq-schema.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml index 75308ab..1ceebe2 100644 --- a/bh20sequploader/bh20seq-schema.yml +++ b/bh20sequploader/bh20seq-schema.yml @@ -106,6 +106,7 @@ $graph: jsonldPredicate: _id: http://purl.obolibrary.org/obo/OBI_0001479 _type: "@id" + noLinkCheck: true sample_storage_conditions: doc: Information about storage of a specified type, e.g. frozen specimen, paraffin, fresh .... type: string? -- cgit v1.2.3 From f4c3da88c1233802fea46cc972a81dc3b5b51185 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 15:37:58 -0400 Subject: Work around CWL content size limit by chunking Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20sequploader/main.py | 1 + workflows/pangenome-generate/relabel-seqs.cwl | 31 +++++++++++++++++++++++---- workflows/pangenome-generate/relabel-seqs.py | 22 +++++++++++++------ 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index 2fda347..4c4711d 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -63,6 +63,7 @@ def main(): external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8') properties = { + "sequence_label": seqlabel, "upload_app": "bh20-seq-uploader", "upload_ip": external_ip, "upload_user": "%s@%s" % (getpass.getuser(), socket.gethostname()) diff --git a/workflows/pangenome-generate/relabel-seqs.cwl b/workflows/pangenome-generate/relabel-seqs.cwl index 2b780d4..01196f6 100644 --- a/workflows/pangenome-generate/relabel-seqs.cwl +++ b/workflows/pangenome-generate/relabel-seqs.cwl @@ -3,6 +3,10 @@ class: CommandLineTool inputs: readsFA: File[] subjects: string[] + script: + type: File + default: {class: File, location: relabel-seqs.py} + inputBinding: {} outputs: relabeledSeqs: type: File @@ -15,11 +19,30 @@ outputs: requirements: InlineJavascriptRequirement: {} InitialWorkDirRequirement: - listing: - - entry: {$include: relabel-seqs.py} - entryname: relabel-seqs.py + listing: | + ${ + var i = 0; + var b = 1; + var out = []; + for (; i < inputs.readsFA.length; i++) { + var block = []; + for (; i < (b*100) && i < inputs.readsFA.length; i++) { + block.push(inputs.readsFA[i]); + } + out.push({ + entryname: "block"+b, + entry: JSON.stringify(block) + }); + b++; + } + out.push({ + entry: JSON.stringify(inputs.subjects), + entryname: "subjects" + }); + return out; + } hints: DockerRequirement: dockerPull: commonworkflowlanguage/cwltool_module stdout: -baseCommand: [python, relabel-seqs.py] +baseCommand: [python] diff --git a/workflows/pangenome-generate/relabel-seqs.py b/workflows/pangenome-generate/relabel-seqs.py index 1188ceb..970540f 100644 --- a/workflows/pangenome-generate/relabel-seqs.py +++ b/workflows/pangenome-generate/relabel-seqs.py @@ -1,5 +1,15 @@ -reads = $(inputs.readsFA) -subjects = $(inputs.subjects) +import os +import json + +reads = [] +b = 1 +while os.path.exists("block%i" % b): + with open("block%i" % b) as f: + reads.extend(json.load(f)) + b += 1 + +with open("subjects") as f: + subjects = json.load(f) relabeled_fasta = open("relabeledSeqs.fasta", "wt") original_labels = open("originalLabels.ttl", "wt") @@ -7,12 +17,12 @@ original_labels = open("originalLabels.ttl", "wt") for i, r in enumerate(reads): with open(r["path"], "rt") as fa: label = fa.readline() - original_labels.write("<%s> \\"%s\\" .\\n" % (subjects[i], label[1:].strip().replace('"', '\\\\"'))) - relabeled_fasta.write(">"+subjects[i]+"\\n") + original_labels.write("<%s> \"%s\" .\n" % (subjects[i], label[1:].strip().replace('"', '\\"'))) + relabeled_fasta.write(">"+subjects[i]+"\n") data = fa.read(8096) while data: relabeled_fasta.write(data) - endswithnewline = data.endswith("\\n") + endswithnewline = data.endswith("\n") data = fa.read(8096) if not endswithnewline: - relabeled_fasta.write("\\n") + relabeled_fasta.write("\n") -- cgit v1.2.3 From 61726edb9293fe529e6efbe5bb6f1cc953bb3c4e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 16:20:10 -0400 Subject: Workaround CWL limit by chunking file list Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- workflows/pangenome-generate/merge-metadata.cwl | 46 +++++++++++++++++++++---- workflows/pangenome-generate/merge-metadata.py | 33 ++++++++++++------ workflows/pangenome-generate/relabel-seqs.cwl | 12 ++++--- workflows/pangenome-generate/relabel-seqs.py | 18 +++++----- 4 files changed, 79 insertions(+), 30 deletions(-) diff --git a/workflows/pangenome-generate/merge-metadata.cwl b/workflows/pangenome-generate/merge-metadata.cwl index fcefe32..4d9c808 100644 --- a/workflows/pangenome-generate/merge-metadata.cwl +++ b/workflows/pangenome-generate/merge-metadata.cwl @@ -5,16 +5,48 @@ hints: dockerPull: commonworkflowlanguage/cwltool_module inputs: metadata: File[] - metadataSchema: File subjects: string[] - dups: File? - originalLabels: File + metadataSchema: + type: File + inputBinding: {position: 2} + originalLabels: + type: File + inputBinding: {position: 3} + dups: + type: File? + inputBinding: {position: 4} + script: + type: File + inputBinding: {position: 1} + default: {class: File, location: merge-metadata.py} outputs: merged: stdout stdout: mergedmetadata.ttl requirements: + InlineJavascriptRequirement: {} InitialWorkDirRequirement: - listing: - - entry: {$include: merge-metadata.py} - entryname: merge-metadata.py -baseCommand: [python3, merge-metadata.py] + listing: | + ${ + var i = 0; + var b = 1; + var out = []; + for (; i < inputs.metadata.length; i++) { + var block = []; + var sub = []; + for (; i < (b*150) && i < inputs.metadata.length; i++) { + block.push(inputs.metadata[i]); + sub.push(inputs.subjects[i]); + } + out.push({ + entryname: "block"+b, + entry: JSON.stringify(block) + }); + out.push({ + entryname: "subs"+b, + entry: JSON.stringify(sub) + }); + b++; + } + return out; + } +baseCommand: python diff --git a/workflows/pangenome-generate/merge-metadata.py b/workflows/pangenome-generate/merge-metadata.py index bfec781..65d08a6 100644 --- a/workflows/pangenome-generate/merge-metadata.py +++ b/workflows/pangenome-generate/merge-metadata.py @@ -2,12 +2,27 @@ import re import schema_salad.schema import schema_salad.jsonld_context import json +import sys +import os +import logging + +metadataSchema = sys.argv[1] +originalLabels = sys.argv[2] +dups = None +if len(sys.argv) == 4: + dups = sys.argv[3] + +def readitems(stem): + items = [] + b = 1 + while os.path.exists("%s%i" % (stem, b)): + with open("%s%i" % (stem, b)) as f: + items.extend(json.load(f)) + b += 1 + return items -metadataSchema = '$(inputs.metadataSchema.path)' -metadata = $(inputs.metadata) -subjects = $(inputs.subjects) -dups = json.loads('''$(inputs.dups)''') -originalLabels = $(inputs.originalLabels) +metadata = readitems("block") +subjects = readitems("subs") (document_loader, avsc_names, @@ -20,17 +35,15 @@ for i, m in enumerate(metadata): g = schema_salad.jsonld_context.makerdf(subjects[i], doc, document_loader.ctx) print(g.serialize(format="ntriples").decode("utf-8")) -import logging - if dups: - sameseqs = open(dups["path"], "rt") + sameseqs = open(dups, "rt") for d in sameseqs: logging.warn(d) - g = re.match(r"\\d+\\t(.*)", d) + g = re.match(r"\d+\t(.*)", d) logging.warn("%s", g.group(1)) sp = g.group(1).split(",") for n in sp[1:]: print("<%s> <%s> ." % (n.strip(), sp[0].strip())) -orig = open(originalLabels["path"], "rt") +orig = open(originalLabels, "rt") print(orig.read()) diff --git a/workflows/pangenome-generate/relabel-seqs.cwl b/workflows/pangenome-generate/relabel-seqs.cwl index 01196f6..c1f17a4 100644 --- a/workflows/pangenome-generate/relabel-seqs.cwl +++ b/workflows/pangenome-generate/relabel-seqs.cwl @@ -26,19 +26,21 @@ requirements: var out = []; for (; i < inputs.readsFA.length; i++) { var block = []; - for (; i < (b*100) && i < inputs.readsFA.length; i++) { + var sub = []; + for (; i < (b*150) && i < inputs.readsFA.length; i++) { block.push(inputs.readsFA[i]); + sub.push(inputs.subjects[i]); } out.push({ entryname: "block"+b, entry: JSON.stringify(block) }); + out.push({ + entryname: "subs"+b, + entry: JSON.stringify(sub) + }); b++; } - out.push({ - entry: JSON.stringify(inputs.subjects), - entryname: "subjects" - }); return out; } hints: diff --git a/workflows/pangenome-generate/relabel-seqs.py b/workflows/pangenome-generate/relabel-seqs.py index 970540f..6b022a0 100644 --- a/workflows/pangenome-generate/relabel-seqs.py +++ b/workflows/pangenome-generate/relabel-seqs.py @@ -1,15 +1,17 @@ import os import json -reads = [] -b = 1 -while os.path.exists("block%i" % b): - with open("block%i" % b) as f: - reads.extend(json.load(f)) - b += 1 +def readitems(stem): + items = [] + b = 1 + while os.path.exists("%s%i" % (stem, b)): + with open("%s%i" % (stem, b)) as f: + items.extend(json.load(f)) + b += 1 + return items -with open("subjects") as f: - subjects = json.load(f) +reads = readitems("block") +subjects = readitems("subs") relabeled_fasta = open("relabeledSeqs.fasta", "wt") original_labels = open("originalLabels.ttl", "wt") -- cgit v1.2.3 From ce696b41b3476891ecb05185d64c289b140a73af Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 17:09:16 -0400 Subject: Increase ram requirement for minimap2 Add --kickoff to immediately start an analysis workflow. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20seqanalyzer/main.py | 14 +++++++++++++- workflows/pangenome-generate/minimap2.cwl | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 193a268..8d0f562 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -214,14 +214,26 @@ def main(): parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='') parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') + parser.add_argument('--kickoff', action="store_true") args = parser.parse_args() api = arvados.api() - logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + schema_ref = upload_schema(api, args.workflow_def_project) + if args.kickoff: + logging.info("Starting a single analysis run") + start_pangenome_analysis(api, + args.pangenome_analysis_project, + args.pangenome_workflow_uuid, + args.validated_project, + schema_ref) + return + + logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + while True: move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) diff --git a/workflows/pangenome-generate/minimap2.cwl b/workflows/pangenome-generate/minimap2.cwl index bf19ef7..42d1dce 100644 --- a/workflows/pangenome-generate/minimap2.cwl +++ b/workflows/pangenome-generate/minimap2.cwl @@ -12,7 +12,7 @@ hints: ResourceRequirement: coresMin: 8 coresMax: 32 - ramMin: $(7 * 1024) + ramMin: $(9 * 1024) outdirMin: $(Math.ceil(inputs.readsFA.size/(1024*1024*1024) + 20)) stdout: $(inputs.readsFA.nameroot).paf baseCommand: minimap2 -- cgit v1.2.3