diff options
Diffstat (limited to 'bh20seqanalyzer')
-rw-r--r-- | bh20seqanalyzer/main.py | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 31ad4c4..9a36cae 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -30,6 +30,7 @@ def validate_upload(api, collection, validated_project, try: metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] + sample_id = metadata_content["sample"]["sample_id"] add_lc_filename(metadata_content, metadata_content["id"]) valid = qc_metadata(metadata_content) and valid except Exception as e: @@ -39,21 +40,25 @@ def validate_upload(api, collection, validated_project, logging.warn("Failed metadata qc") if valid: - tgt = None - for n in ("sequence.fasta", "reads.fastq"): - if n not in col: - continue - with col.open(n) as qf: - tgt = qc_fasta(qf) - if tgt != n: - logging.info("Expected %s but magic says it should be %s", n, tgt) - valid = False - elif tgt == "reads.fastq": - start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid) - return False - if tgt is None: + try: + tgt = None + paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} + for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + if n not in col: + continue + with col.open(n, 'rb') as qf: + tgt = qc_fasta(qf)[0] + if tgt != n and tgt != paired.get(n): + logging.info("Expected %s but magic says it should be %s", n, tgt) + valid = False + elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id) + return False + if tgt is None: + valid = False + logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) + except ValueError as v: valid = False - logging.warn("Upload '%s' does not contain sequence.fasta or reads.fastq", collection["name"]) dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], ["portable_data_hash", "=", col.portable_data_hash()]]).execute() @@ -69,9 +74,8 @@ def validate_upload(api, collection, validated_project, "owner_uuid": validated_project, "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute() else: - pass # It is invalid, delete it. - #logging.warn("Deleting '%s'" % collection["name"]) + logging.warn("Suggest deleting '%s'" % collection["name"]) #api.collections().delete(uuid=collection["uuid"]).execute() return valid @@ -95,6 +99,7 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj): tmp.name] logging.info("Running %s" % ' '.join(cmd)) comp = subprocess.run(cmd, capture_output=True) + logging.info("Submitted %s", comp.stdout) if comp.returncode != 0: logging.error(comp.stderr.decode('utf-8')) @@ -103,12 +108,11 @@ def run_workflow(api, parent_project, workflow_uuid, name, inputobj): def start_fastq_to_fasta(api, collection, analysis_project, - fastq_workflow_uuid): - newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", { - "fastq_forward": { - "class": "File", - "location": "keep:%s/reads.fastq" % collection["portable_data_hash"] - }, + fastq_workflow_uuid, + tgt, + sample_id): + + params = { "metadata": { "class": "File", "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"] @@ -116,8 +120,26 @@ def start_fastq_to_fasta(api, collection, "ref_fasta": { "class": "File", "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta" + }, + "sample_id": sample_id + } + + if tgt.startswith("reads.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt) + } + elif tgt.startswith("reads_1.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:]) } - }) + params["fastq_reverse"] = { + "class": "File", + "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:]) + } + + newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params) api.collections().update(uuid=collection["uuid"], body={"owner_uuid": newproject["uuid"]}).execute() @@ -222,6 +244,7 @@ def main(): parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') parser.add_argument('--kickoff', action="store_true") + parser.add_argument('--once', action="store_true") args = parser.parse_args() api = arvados.api() @@ -265,4 +288,6 @@ def main(): args.pangenome_analysis_project, args.latest_result_collection) + if args.once: + break time.sleep(15) |