From 53ff8af0843942d83dff9fd5b95d1ae98e80fe27 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 16 Jul 2020 11:48:35 -0400 Subject: Refactor analysis code into a class Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20seqanalyzer/main.py | 597 ++++++++++++++++++++++-------------------- bh20simplewebuploader/main.py | 28 +- 2 files changed, 326 insertions(+), 299 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 0b52e6b..f2bb234 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -16,277 +16,306 @@ logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="% level=logging.INFO) logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN) -def validate_upload(api, collection, validated_project, - fastq_project, fastq_workflow_uuid, - revalidate): - col = arvados.collection.Collection(collection["uuid"]) - - if not revalidate and collection["properties"].get("status") in ("validated", "rejected"): - return False - - # validate the collection here. Check metadata, etc. - logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) - - errors = [] - - if collection["owner_uuid"] != validated_project: - dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], - ["portable_data_hash", "=", col.portable_data_hash()]]).execute() - if dup["items"]: - # This exact collection has been uploaded before. - errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) - - if not errors: - if "metadata.yaml" not in col: - errors.append("Missing metadata.yaml", collection["name"]) - else: +class SeqAnalyzer: + + def __init__(self, api, keepclient, + uploader_project, + pangenome_analysis_project, + fastq_project, + validated_project, + workflow_def_project, + pangenome_workflow_uuid, + fastq_workflow_uuid, + exclude_list, + latest_result_collection): + self.api = api + self.keepclient = keepclient + self.uploader_project = uploader_project + self.pangenome_analysis_project = pangenome_analysis_project + self.fastq_project = fastq_project + self.validated_project = validated_project + self.workflow_def_project = workflow_def_project + self.pangenome_workflow_uuid = pangenome_workflow_uuid + self.fastq_workflow_uuid = fastq_workflow_uuid + self.exclude_list = exclude_list + self.latest_result_uuid = latest_result_collection + self.schema_ref = None + + def validate_upload(self, collection, revalidate): + col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) + + if not revalidate and collection["properties"].get("status") in ("validated", "rejected"): + return False + + # validate the collection here. Check metadata, etc. + logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) + + errors = [] + + if collection["owner_uuid"] != self.validated_project: + dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], + ["portable_data_hash", "=", col.portable_data_hash()]]).execute() + if dup["items"]: + # This exact collection has been uploaded before. + errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) + + if not errors: + if "metadata.yaml" not in col: + errors.append("Missing metadata.yaml", collection["name"]) + else: + try: + with col.open("metadata.yaml") as md: + metadata_content = ruamel.yaml.round_trip_load(md) + metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] + sample_id = metadata_content["sample"]["sample_id"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) + if not valid: + errors.append("Failed metadata qc") + except Exception as e: + errors.append(str(e)) + + if not errors: try: - metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) - metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] - sample_id = metadata_content["sample"]["sample_id"] - add_lc_filename(metadata_content, metadata_content["id"]) - valid = qc_metadata(metadata_content) - if not valid: - errors.append("Failed metadata qc") - except Exception as e: - errors.append(str(e)) - - if not errors: - try: - tgt = None - paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} - for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - if n not in col: - continue - with col.open(n, 'rb') as qf: - tgt = qc_fasta(qf)[0] - if tgt != n and tgt != paired.get(n): - errors.append("Expected %s but magic says it should be %s", n, tgt) - elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id) - return False - if tgt is None: - errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) - except Exception as v: - errors.append(str(v)) - - - if not errors: - # Move it to the "validated" project to be included in the next analysis - if "errors" in collection["properties"]: - del collection["properties"]["errors"] - collection["properties"]["status"] = "validated" - api.collections().update(uuid=collection["uuid"], body={ - "owner_uuid": validated_project, - "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), - "properties": collection["properties"]}).execute() - logging.info("Added '%s' to validated sequences" % collection["name"]) - return True - else: - # It is invalid - logging.warn("'%s' (%s) has validation errors: %s" % ( - collection["name"], collection["uuid"], "\n".join(errors))) - collection["properties"]["status"] = "rejected" - collection["properties"]["errors"] = errors - api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() - return False - - -def run_workflow(api, parent_project, workflow_uuid, name, inputobj): - project = api.groups().create(body={ - "group_class": "project", - "name": name, - "owner_uuid": parent_project, - }, ensure_unique_name=True).execute() - - with tempfile.NamedTemporaryFile() as tmp: - tmp.write(json.dumps(inputobj, indent=2).encode('utf-8')) - tmp.flush() - cmd = ["arvados-cwl-runner", - "--submit", - "--no-wait", - "--project-uuid=%s" % project["uuid"], - "arvwf:%s" % workflow_uuid, - tmp.name] - logging.info("Running %s" % ' '.join(cmd)) - comp = subprocess.run(cmd, capture_output=True) - logging.info("Submitted %s", comp.stdout) - if comp.returncode != 0: - logging.error(comp.stderr.decode('utf-8')) - - return project - - -def start_fastq_to_fasta(api, collection, - analysis_project, - fastq_workflow_uuid, - tgt, - sample_id): - - params = { - "metadata": { - "class": "File", - "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"] - }, - "ref_fasta": { - "class": "File", - "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta" - }, - "sample_id": sample_id - } - - if tgt.startswith("reads.fastq"): - params["fastq_forward"] = { - "class": "File", - "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt) - } - elif tgt.startswith("reads_1.fastq"): - params["fastq_forward"] = { - "class": "File", - "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:]) - } - params["fastq_reverse"] = { - "class": "File", - "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:]) + tgt = None + paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} + for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + if n not in col: + continue + with col.open(n, 'rb') as qf: + tgt = qc_fasta(qf)[0] + if tgt != n and tgt != paired.get(n): + errors.append("Expected %s but magic says it should be %s", n, tgt) + elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + self.start_fastq_to_fasta(collection, n, sample_id) + return False + if tgt is None: + errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) + except Exception as v: + errors.append(str(v)) + + + if not errors: + # Move it to the "validated" project to be included in the next analysis + if "errors" in collection["properties"]: + del collection["properties"]["errors"] + collection["properties"]["status"] = "validated" + self.api.collections().update(uuid=collection["uuid"], body={ + "owner_uuid": self.validated_project, + "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), + "properties": collection["properties"]}).execute() + logging.info("Added '%s' to validated sequences" % collection["name"]) + return True + else: + # It is invalid + logging.warn("'%s' (%s) has validation errors: %s" % ( + collection["name"], collection["uuid"], "\n".join(errors))) + collection["properties"]["status"] = "rejected" + collection["properties"]["errors"] = errors + self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() + return False + + + def run_workflow(self, parent_project, workflow_uuid, name, inputobj): + project = self.api.groups().create(body={ + "group_class": "project", + "name": name, + "owner_uuid": parent_project, + }, ensure_unique_name=True).execute() + + with tempfile.NamedTemporaryFile() as tmp: + tmp.write(json.dumps(inputobj, indent=2).encode('utf-8')) + tmp.flush() + cmd = ["arvados-cwl-runner", + "--submit", + "--no-wait", + "--project-uuid=%s" % project["uuid"], + "arvwf:%s" % workflow_uuid, + tmp.name] + logging.info("Running %s" % ' '.join(cmd)) + comp = subprocess.run(cmd, capture_output=True) + logging.info("Submitted %s", comp.stdout) + if comp.returncode != 0: + logging.error(comp.stderr.decode('utf-8')) + + return project + + + def start_fastq_to_fasta(self, collection, + tgt, + sample_id): + + params = { + "metadata": { + "class": "File", + "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"] + }, + "ref_fasta": { + "class": "File", + "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta" + }, + "sample_id": sample_id } - newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params) - api.collections().update(uuid=collection["uuid"], - body={"owner_uuid": newproject["uuid"]}).execute() - -def start_pangenome_analysis(api, - analysis_project, - pangenome_workflow_uuid, - validated_project, - schema_ref, - exclude_list): - validated = arvados.util.list_all(api.collections().list, filters=[ - ["owner_uuid", "=", validated_project], - ["properties.status", "=", "validated"]]) - inputobj = { - "inputReads": [], - "metadata": [], - "subjects": [], - "metadataSchema": { - "class": "File", - "location": schema_ref - }, - "exclude": { - "class": "File", - "location": exclude_list + if tgt.startswith("reads.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt) + } + elif tgt.startswith("reads_1.fastq"): + params["fastq_forward"] = { + "class": "File", + "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:]) + } + params["fastq_reverse"] = { + "class": "File", + "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:]) + } + + newproject = self.run_workflow(self.fastq_project, self.fastq_workflow_uuid, "FASTQ to FASTA", params) + self.api.collections().update(uuid=collection["uuid"], + body={"owner_uuid": newproject["uuid"]}).execute() + + def start_pangenome_analysis(self): + + if self.schema_ref is None: + self.upload_schema() + + validated = arvados.util.list_all(self.api.collections().list, filters=[ + ["owner_uuid", "=", self.validated_project], + ["properties.status", "=", "validated"]]) + inputobj = { + "inputReads": [], + "metadata": [], + "subjects": [], + "metadataSchema": { + "class": "File", + "location": self.schema_ref + }, + "exclude": { + "class": "File", + "location": self.exclude_list + } } - } - validated.sort(key=lambda v: v["portable_data_hash"]) - for v in validated: - inputobj["inputReads"].append({ - "class": "File", - "location": "keep:%s/sequence.fasta" % v["portable_data_hash"] - }) - inputobj["metadata"].append({ - "class": "File", - "location": "keep:%s/metadata.yaml" % v["portable_data_hash"] - }) - inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"]) - run_workflow(api, analysis_project, pangenome_workflow_uuid, "Pangenome analysis", inputobj) - - -def get_workflow_output_from_project(api, uuid): - cr = api.container_requests().list(filters=[['owner_uuid', '=', uuid], - ["requesting_container_uuid", "=", None]]).execute() - if cr["items"] and cr["items"][0]["output_uuid"]: - container = api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute() - if container["state"] == "Complete" and container["exit_code"] == 0: - return cr["items"][0] - return None - - -def copy_most_recent_result(api, analysis_project, latest_result_uuid): - most_recent_analysis = api.groups().list(filters=[['owner_uuid', '=', analysis_project]], - order="created_at desc").execute() - for m in most_recent_analysis["items"]: - wf = get_workflow_output_from_project(api, m["uuid"]) - if wf: - src = api.collections().get(uuid=wf["output_uuid"]).execute() - dst = api.collections().get(uuid=latest_result_uuid).execute() - if src["portable_data_hash"] != dst["portable_data_hash"]: - logging.info("Copying latest result from '%s' to %s", m["name"], latest_result_uuid) - api.collections().update(uuid=latest_result_uuid, - body={"manifest_text": src["manifest_text"], - "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute() - break - + validated.sort(key=lambda v: v["portable_data_hash"]) + for v in validated: + inputobj["inputReads"].append({ + "class": "File", + "location": "keep:%s/sequence.fasta" % v["portable_data_hash"] + }) + inputobj["metadata"].append({ + "class": "File", + "location": "keep:%s/metadata.yaml" % v["portable_data_hash"] + }) + inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"]) + self.run_workflow(self.pangenome_analysis_project, self.pangenome_workflow_uuid, "Pangenome analysis", inputobj) + + + def get_workflow_output_from_project(self, uuid): + cr = self.api.container_requests().list(filters=[['owner_uuid', '=', uuid], + ["requesting_container_uuid", "=", None]]).execute() + if cr["items"] and cr["items"][0]["output_uuid"]: + container = self.api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute() + if container["state"] == "Complete" and container["exit_code"] == 0: + return cr["items"][0] + return None + + + def copy_most_recent_result(self): + most_recent_analysis = self.api.groups().list(filters=[['owner_uuid', '=', self.pangenome_analysis_project]], + order="created_at desc").execute() + for m in most_recent_analysis["items"]: + wf = self.get_workflow_output_from_project(m["uuid"]) + if wf: + src = self.api.collections().get(uuid=wf["output_uuid"]).execute() + dst = self.api.collections().get(uuid=self.latest_result_uuid).execute() + if src["portable_data_hash"] != dst["portable_data_hash"]: + logging.info("Copying latest result from '%s' to %s", m["name"], self.latest_result_uuid) + self.api.collections().update(uuid=self.latest_result_uuid, + body={"manifest_text": src["manifest_text"], + "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute() + break + + + def move_fastq_to_fasta_results(self): + projects = self.api.groups().list(filters=[['owner_uuid', '=', self.fastq_project], + ["properties.moved_output", "!=", True]], + order="created_at desc",).execute() + for p in projects["items"]: + wf = self.get_workflow_output_from_project(p["uuid"]) + if not wf: + continue -def move_fastq_to_fasta_results(api, analysis_project, uploader_project): - projects = api.groups().list(filters=[['owner_uuid', '=', analysis_project], - ["properties.moved_output", "!=", True]], - order="created_at desc",).execute() - for p in projects["items"]: - wf = get_workflow_output_from_project(api, p["uuid"]) - if wf: logging.info("Moving completed fastq2fasta result %s back to uploader project", wf["output_uuid"]) - api.collections().update(uuid=wf["output_uuid"], - body={"owner_uuid": uploader_project}).execute() + self.api.collections().update(uuid=wf["output_uuid"], + body={"owner_uuid": self.uploader_project}).execute() + + col = arvados.collection.Collection(wf["output_uuid"], api_client=self.api, keep_client=self.keepclient) + with col.open("metadata.yaml") as md: + metadata_content = ruamel.yaml.round_trip_load(md) + p["properties"]["moved_output"] = True - api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute() + p["properties"]["sequence_label"] = metadata_content["sample"]["sample_id"] + self.api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute() break -def upload_schema(api, workflow_def_project): - schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml") - c = arvados.collection.Collection() - with c.open("schema.yml", "wb") as f: - f.write(schema_resource.read()) - pdh = c.portable_data_hash() - wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project], - ["portable_data_hash", "=", pdh]]).execute() - if len(wd["items"]) == 0: - c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True) - return "keep:%s/schema.yml" % pdh - - -def print_status(api, uploader_project, fmt): - pending = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", uploader_project]]) - out = [] - status = {} - for p in pending: - prop = p["properties"] - out.append(prop) - if "status" not in prop: - prop["status"] = "pending" - prop["created_at"] = p["created_at"] - prop["uuid"] = p["uuid"] - status[prop["status"]] = status.get(prop["status"], 0) + 1 - if fmt == "html": - print( -""" - - -""") - print("

Total collections in upload project %s

" % len(out)) - print("

Status %s

" % status) - print( -""" - - - - - -""") - for r in out: - print("") - print("" % (r["uuid"], r["uuid"])) - print("" % r["sequence_label"]) - print("" % r["status"]) - print("" % "\n".join(r.get("errors", []))) - print("") - print( -""" -
CollectionSequence labelStatusErrors
%s%s%s
%s
- - -""") - else: - print(json.dumps(out, indent=2)) + def upload_schema(self): + schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml") + c = arvados.collection.Collection(api_client=self.api, keep_client=self.keepclient) + with c.open("schema.yml", "wb") as f: + f.write(schema_resource.read()) + pdh = c.portable_data_hash() + wd = self.api.collections().list(filters=[["owner_uuid", "=", self.workflow_def_project], + ["portable_data_hash", "=", pdh]]).execute() + if len(wd["items"]) == 0: + c.save_new(owner_uuid=self.workflow_def_project, name="Metadata schema", ensure_unique_name=True) + self.schema_ref = "keep:%s/schema.yml" % pdh + + + def print_status(self, fmt): + pending = arvados.util.list_all(self.api.collections().list, filters=[["owner_uuid", "=", self.uploader_project]]) + out = [] + status = {} + for p in pending: + prop = p["properties"] + out.append(prop) + if "status" not in prop: + prop["status"] = "pending" + prop["created_at"] = p["created_at"] + prop["uuid"] = p["uuid"] + status[prop["status"]] = status.get(prop["status"], 0) + 1 + if fmt == "html": + print( + """ + + + """) + print("

Total collections in upload project %s

" % len(out)) + print("

Status %s

" % status) + print( + """ + + + + + + """) + for r in out: + print("") + print("" % (r["uuid"], r["uuid"])) + print("" % r["sequence_label"]) + print("" % r["status"]) + print("" % "\n".join(r.get("errors", []))) + print("") + print( + """ +
CollectionSequence labelStatusErrors
%s%s%s
%s
+ + + """) + else: + print(json.dumps(out, indent=2)) def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') @@ -310,50 +339,42 @@ def main(): args = parser.parse_args() api = arvados.api() - - - - schema_ref = upload_schema(api, args.workflow_def_project) + keepclient = arvados.keep.KeepClient(api_client=api) + + seqanalyzer = SeqAnalyzer(api, keepclient, + args.uploader_project, + args.pangenome_analysis_project, + args.fastq_project, + args.validated_project, + args.workflow_def_project, + args.pangenome_workflow_uuid, + args.fastq_workflow_uuid, + args.exclude_list, + args.latest_result_collection) if args.kickoff: logging.info("Starting a single analysis run") - start_pangenome_analysis(api, - args.pangenome_analysis_project, - args.pangenome_workflow_uuid, - args.validated_project, - schema_ref, - args.exclude_list) + seqanalyzer.start_pangenome_analysis() return if args.print_status: - print_status(api, args.uploader_project, args.print_status) + seqanalyzer.print_status(args.print_status) exit(0) logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) while True: - move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) + seqanalyzer.move_fastq_to_fasta_results() new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]]) at_least_one_new_valid_seq = False for c in new_collections: - at_least_one_new_valid_seq = validate_upload(api, c, - args.validated_project, - args.fastq_project, - args.fastq_workflow_uuid, - args.revalidate) or at_least_one_new_valid_seq + at_least_one_new_valid_seq = seqanalyzer.validate_upload(c, args.revalidate) or at_least_one_new_valid_seq if at_least_one_new_valid_seq and not args.no_start_analysis: - start_pangenome_analysis(api, - args.pangenome_analysis_project, - args.pangenome_workflow_uuid, - args.validated_project, - schema_ref, - args.exclude_list) - - copy_most_recent_result(api, - args.pangenome_analysis_project, - args.latest_result_collection) + seqanalyzer.start_pangenome_analysis() + + seqanalyzer.copy_most_recent_result() if args.once: break diff --git a/bh20simplewebuploader/main.py b/bh20simplewebuploader/main.py index 8089883..3173d60 100644 --- a/bh20simplewebuploader/main.py +++ b/bh20simplewebuploader/main.py @@ -479,10 +479,13 @@ def pending_table(output, items): for r in items: if r["status"] != "pending": continue - output.write("") - output.write("%s" % (r["uuid"], r["uuid"])) - output.write("%s" % Markup.escape(r["sequence_label"])) - output.write("") + try: + output.write("") + output.write("%s" % (r["uuid"], r["uuid"])) + output.write("%s" % Markup.escape(r.get("sequence_label"))) + output.write("") + except: + pass output.write( """ @@ -497,13 +500,16 @@ def rejected_table(output, items): Errors """) for r in items: - if r["status"] != "rejected": - continue - output.write("") - output.write("%s" % (r["uuid"], r["uuid"])) - output.write("%s" % Markup.escape(r["sequence_label"])) - output.write("
%s
" % Markup.escape("\n".join(r.get("errors", [])))) - output.write("") + try: + if r["status"] != "rejected": + continue + output.write("") + output.write("%s" % (r["uuid"], r["uuid"])) + output.write("%s" % Markup.escape(r.get("sequence_label"))) + output.write("
%s
" % Markup.escape("\n".join(r.get("errors", [])))) + output.write("") + except: + pass output.write( """ -- cgit v1.2.3