From f4c3da88c1233802fea46cc972a81dc3b5b51185 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 15:37:58 -0400 Subject: Work around CWL content size limit by chunking Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- workflows/pangenome-generate/relabel-seqs.cwl | 31 +++++++++++++++++++++++---- workflows/pangenome-generate/relabel-seqs.py | 22 +++++++++++++------ 2 files changed, 43 insertions(+), 10 deletions(-) (limited to 'workflows') diff --git a/workflows/pangenome-generate/relabel-seqs.cwl b/workflows/pangenome-generate/relabel-seqs.cwl index 2b780d4..01196f6 100644 --- a/workflows/pangenome-generate/relabel-seqs.cwl +++ b/workflows/pangenome-generate/relabel-seqs.cwl @@ -3,6 +3,10 @@ class: CommandLineTool inputs: readsFA: File[] subjects: string[] + script: + type: File + default: {class: File, location: relabel-seqs.py} + inputBinding: {} outputs: relabeledSeqs: type: File @@ -15,11 +19,30 @@ outputs: requirements: InlineJavascriptRequirement: {} InitialWorkDirRequirement: - listing: - - entry: {$include: relabel-seqs.py} - entryname: relabel-seqs.py + listing: | + ${ + var i = 0; + var b = 1; + var out = []; + for (; i < inputs.readsFA.length; i++) { + var block = []; + for (; i < (b*100) && i < inputs.readsFA.length; i++) { + block.push(inputs.readsFA[i]); + } + out.push({ + entryname: "block"+b, + entry: JSON.stringify(block) + }); + b++; + } + out.push({ + entry: JSON.stringify(inputs.subjects), + entryname: "subjects" + }); + return out; + } hints: DockerRequirement: dockerPull: commonworkflowlanguage/cwltool_module stdout: -baseCommand: [python, relabel-seqs.py] +baseCommand: [python] diff --git a/workflows/pangenome-generate/relabel-seqs.py b/workflows/pangenome-generate/relabel-seqs.py index 1188ceb..970540f 100644 --- a/workflows/pangenome-generate/relabel-seqs.py +++ b/workflows/pangenome-generate/relabel-seqs.py @@ -1,5 +1,15 @@ -reads = $(inputs.readsFA) -subjects = $(inputs.subjects) +import os +import json + +reads = [] +b = 1 +while os.path.exists("block%i" % b): + with open("block%i" % b) as f: + reads.extend(json.load(f)) + b += 1 + +with open("subjects") as f: + subjects = json.load(f) relabeled_fasta = open("relabeledSeqs.fasta", "wt") original_labels = open("originalLabels.ttl", "wt") @@ -7,12 +17,12 @@ original_labels = open("originalLabels.ttl", "wt") for i, r in enumerate(reads): with open(r["path"], "rt") as fa: label = fa.readline() - original_labels.write("<%s> \\"%s\\" .\\n" % (subjects[i], label[1:].strip().replace('"', '\\\\"'))) - relabeled_fasta.write(">"+subjects[i]+"\\n") + original_labels.write("<%s> \"%s\" .\n" % (subjects[i], label[1:].strip().replace('"', '\\"'))) + relabeled_fasta.write(">"+subjects[i]+"\n") data = fa.read(8096) while data: relabeled_fasta.write(data) - endswithnewline = data.endswith("\\n") + endswithnewline = data.endswith("\n") data = fa.read(8096) if not endswithnewline: - relabeled_fasta.write("\\n") + relabeled_fasta.write("\n") -- cgit v1.2.3 From 61726edb9293fe529e6efbe5bb6f1cc953bb3c4e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 16:20:10 -0400 Subject: Workaround CWL limit by chunking file list Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- workflows/pangenome-generate/merge-metadata.cwl | 46 +++++++++++++++++++++---- workflows/pangenome-generate/merge-metadata.py | 33 ++++++++++++------ workflows/pangenome-generate/relabel-seqs.cwl | 12 ++++--- workflows/pangenome-generate/relabel-seqs.py | 18 +++++----- 4 files changed, 79 insertions(+), 30 deletions(-) (limited to 'workflows') diff --git a/workflows/pangenome-generate/merge-metadata.cwl b/workflows/pangenome-generate/merge-metadata.cwl index fcefe32..4d9c808 100644 --- a/workflows/pangenome-generate/merge-metadata.cwl +++ b/workflows/pangenome-generate/merge-metadata.cwl @@ -5,16 +5,48 @@ hints: dockerPull: commonworkflowlanguage/cwltool_module inputs: metadata: File[] - metadataSchema: File subjects: string[] - dups: File? - originalLabels: File + metadataSchema: + type: File + inputBinding: {position: 2} + originalLabels: + type: File + inputBinding: {position: 3} + dups: + type: File? + inputBinding: {position: 4} + script: + type: File + inputBinding: {position: 1} + default: {class: File, location: merge-metadata.py} outputs: merged: stdout stdout: mergedmetadata.ttl requirements: + InlineJavascriptRequirement: {} InitialWorkDirRequirement: - listing: - - entry: {$include: merge-metadata.py} - entryname: merge-metadata.py -baseCommand: [python3, merge-metadata.py] + listing: | + ${ + var i = 0; + var b = 1; + var out = []; + for (; i < inputs.metadata.length; i++) { + var block = []; + var sub = []; + for (; i < (b*150) && i < inputs.metadata.length; i++) { + block.push(inputs.metadata[i]); + sub.push(inputs.subjects[i]); + } + out.push({ + entryname: "block"+b, + entry: JSON.stringify(block) + }); + out.push({ + entryname: "subs"+b, + entry: JSON.stringify(sub) + }); + b++; + } + return out; + } +baseCommand: python diff --git a/workflows/pangenome-generate/merge-metadata.py b/workflows/pangenome-generate/merge-metadata.py index bfec781..65d08a6 100644 --- a/workflows/pangenome-generate/merge-metadata.py +++ b/workflows/pangenome-generate/merge-metadata.py @@ -2,12 +2,27 @@ import re import schema_salad.schema import schema_salad.jsonld_context import json +import sys +import os +import logging + +metadataSchema = sys.argv[1] +originalLabels = sys.argv[2] +dups = None +if len(sys.argv) == 4: + dups = sys.argv[3] + +def readitems(stem): + items = [] + b = 1 + while os.path.exists("%s%i" % (stem, b)): + with open("%s%i" % (stem, b)) as f: + items.extend(json.load(f)) + b += 1 + return items -metadataSchema = '$(inputs.metadataSchema.path)' -metadata = $(inputs.metadata) -subjects = $(inputs.subjects) -dups = json.loads('''$(inputs.dups)''') -originalLabels = $(inputs.originalLabels) +metadata = readitems("block") +subjects = readitems("subs") (document_loader, avsc_names, @@ -20,17 +35,15 @@ for i, m in enumerate(metadata): g = schema_salad.jsonld_context.makerdf(subjects[i], doc, document_loader.ctx) print(g.serialize(format="ntriples").decode("utf-8")) -import logging - if dups: - sameseqs = open(dups["path"], "rt") + sameseqs = open(dups, "rt") for d in sameseqs: logging.warn(d) - g = re.match(r"\\d+\\t(.*)", d) + g = re.match(r"\d+\t(.*)", d) logging.warn("%s", g.group(1)) sp = g.group(1).split(",") for n in sp[1:]: print("<%s> <%s> ." % (n.strip(), sp[0].strip())) -orig = open(originalLabels["path"], "rt") +orig = open(originalLabels, "rt") print(orig.read()) diff --git a/workflows/pangenome-generate/relabel-seqs.cwl b/workflows/pangenome-generate/relabel-seqs.cwl index 01196f6..c1f17a4 100644 --- a/workflows/pangenome-generate/relabel-seqs.cwl +++ b/workflows/pangenome-generate/relabel-seqs.cwl @@ -26,19 +26,21 @@ requirements: var out = []; for (; i < inputs.readsFA.length; i++) { var block = []; - for (; i < (b*100) && i < inputs.readsFA.length; i++) { + var sub = []; + for (; i < (b*150) && i < inputs.readsFA.length; i++) { block.push(inputs.readsFA[i]); + sub.push(inputs.subjects[i]); } out.push({ entryname: "block"+b, entry: JSON.stringify(block) }); + out.push({ + entryname: "subs"+b, + entry: JSON.stringify(sub) + }); b++; } - out.push({ - entry: JSON.stringify(inputs.subjects), - entryname: "subjects" - }); return out; } hints: diff --git a/workflows/pangenome-generate/relabel-seqs.py b/workflows/pangenome-generate/relabel-seqs.py index 970540f..6b022a0 100644 --- a/workflows/pangenome-generate/relabel-seqs.py +++ b/workflows/pangenome-generate/relabel-seqs.py @@ -1,15 +1,17 @@ import os import json -reads = [] -b = 1 -while os.path.exists("block%i" % b): - with open("block%i" % b) as f: - reads.extend(json.load(f)) - b += 1 +def readitems(stem): + items = [] + b = 1 + while os.path.exists("%s%i" % (stem, b)): + with open("%s%i" % (stem, b)) as f: + items.extend(json.load(f)) + b += 1 + return items -with open("subjects") as f: - subjects = json.load(f) +reads = readitems("block") +subjects = readitems("subs") relabeled_fasta = open("relabeledSeqs.fasta", "wt") original_labels = open("originalLabels.ttl", "wt") -- cgit v1.2.3 From ce696b41b3476891ecb05185d64c289b140a73af Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2020 17:09:16 -0400 Subject: Increase ram requirement for minimap2 Add --kickoff to immediately start an analysis workflow. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20seqanalyzer/main.py | 14 +++++++++++++- workflows/pangenome-generate/minimap2.cwl | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) (limited to 'workflows') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 193a268..8d0f562 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -214,14 +214,26 @@ def main(): parser.add_argument('--fastq-workflow-uuid', type=str, default='lugli-7fd4e-2zp9q4jo5xpif9y', help='') parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') + parser.add_argument('--kickoff', action="store_true") args = parser.parse_args() api = arvados.api() - logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + schema_ref = upload_schema(api, args.workflow_def_project) + if args.kickoff: + logging.info("Starting a single analysis run") + start_pangenome_analysis(api, + args.pangenome_analysis_project, + args.pangenome_workflow_uuid, + args.validated_project, + schema_ref) + return + + logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) + while True: move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) diff --git a/workflows/pangenome-generate/minimap2.cwl b/workflows/pangenome-generate/minimap2.cwl index bf19ef7..42d1dce 100644 --- a/workflows/pangenome-generate/minimap2.cwl +++ b/workflows/pangenome-generate/minimap2.cwl @@ -12,7 +12,7 @@ hints: ResourceRequirement: coresMin: 8 coresMax: 32 - ramMin: $(7 * 1024) + ramMin: $(9 * 1024) outdirMin: $(Math.ceil(inputs.readsFA.size/(1024*1024*1024) + 20)) stdout: $(inputs.readsFA.nameroot).paf baseCommand: minimap2 -- cgit v1.2.3