From 2cb5faa2b088cf36c8c41633db137fc020a34529 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 11 Nov 2020 17:38:16 -0500 Subject: Support uploading new metadata only Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20seqanalyzer/main.py | 182 ++++++++++++++++++++++++------------------------ bh20sequploader/main.py | 48 +++++++------ 2 files changed, 116 insertions(+), 114 deletions(-) diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index b54a746..7176fe8 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -45,99 +45,97 @@ class SeqAnalyzer: if not revalidate and collection["properties"].get("status") in ("validated", "rejected"): return False - col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) - - # validate the collection here. Check metadata, etc. - logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) - - errors = [] - - if collection["owner_uuid"] != self.validated_project: - dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], - ["portable_data_hash", "=", col.portable_data_hash()]]).execute() - if dup["items"]: - # This exact collection has been uploaded before. - errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) - - if not errors: - if "metadata.yaml" not in col: - errors.append("Missing metadata.yaml", collection["name"]) - else: + with arvados.collection.CollectionReader(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as col: + # validate the collection here. Check metadata, etc. + logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) + + errors = [] + + if collection["owner_uuid"] != self.validated_project: + dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], + ["portable_data_hash", "=", col.portable_data_hash()]]).execute() + if dup["items"]: + # This exact collection has been uploaded before. + errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) + + if not errors: + if "metadata.yaml" not in col: + errors.append("%s missing metadata.yaml" % collection["name"]) + else: + try: + with col.open("metadata.yaml") as md: + metadata_content = ruamel.yaml.round_trip_load(md) + metadata_content["id"] = "http://covid19.genenetwork.org/resource/%s" % collection["uuid"] + sample_id = metadata_content["sample"]["sample_id"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) + if not valid: + errors.append("Failed metadata qc") + except Exception as e: + errors.append(str(e)) + + existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], + ["properties.sequence_label", "=", sample_id]]).execute() + + if not errors: try: - with col.open("metadata.yaml") as md: - metadata_content = ruamel.yaml.round_trip_load(md) - metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] - sample_id = metadata_content["sample"]["sample_id"] - add_lc_filename(metadata_content, metadata_content["id"]) - valid = qc_metadata(metadata_content) - if not valid: - errors.append("Failed metadata qc") - except Exception as e: - errors.append(str(e)) - - if not errors: - try: - tgt = None - paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} - for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - if n not in col: - continue - with col.open(n, 'rb') as qf: - tgt, seqlabel, seq_type = qc_fasta(qf) - if tgt != n and tgt != paired.get(n): - errors.append("Expected %s but magic says it should be %s", n, tgt) - elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - self.start_fastq_to_fasta(collection, n, sample_id) - return False - - # If it is a FASTA - if sample_id != seqlabel: - errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel)) - if tgt is None: - errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) - except Exception as v: - errors.append(str(v)) - - if errors: - # It is invalid - logging.warn("'%s' (%s) has validation errors: %s" % ( - collection["name"], collection["uuid"], "\n".join(errors))) - collection["properties"]["status"] = "rejected" - collection["properties"]["errors"] = errors - self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() - return False - - existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], - ["properties.sequence_label", "=", sample_id]]).execute() - - update_from = None - if existing["items"]: - # "collection" is the newly uploaded one we're looking at - update_from = collection - collection = existing["items"][0] - collection["properties"] = update_from["properties"] - - if "errors" in collection["properties"]: - del collection["properties"]["errors"] - collection["properties"]["status"] = "validated" - collection["properties"]["sequence_label"] = sample_id - - if update_from: - self.api.collections().update(uuid=collection["uuid"], body={ - "properties": collection["properties"], - "manifest_text": col.manifest_text() - }).execute() - self.api.collections().delete(uuid=update_from["uuid"]).execute() - logging.info("Updated '%s' in validated sequences" % collection["name"]) - else: - # Move it to the "validated" project to be included in the next analysis - self.api.collections().update(uuid=collection["uuid"], body={ - "owner_uuid": self.validated_project, - "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), - "properties": collection["properties"]}).execute() - logging.info("Added '%s' to validated sequences" % collection["name"]) - - return True + tgt = None + paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} + for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + if n not in col: + continue + with col.open(n, 'rb') as qf: + tgt, seqlabel, seq_type = qc_fasta(qf) + if tgt != n and tgt != paired.get(n): + errors.append("Expected %s but magic says it should be %s" % (n, tgt)) + elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + self.start_fastq_to_fasta(collection, n, sample_id) + return False + + # If it is a FASTA + if sample_id != seqlabel: + errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel)) + if tgt is None and len(existing["items"]) == 0: + errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq" % collection["name"]) + except Exception as v: + errors.append(str(v)) + + if errors: + # It is invalid + logging.warn("'%s' (%s) has validation errors: %s" % ( + collection["name"], collection["uuid"], "\n".join(errors))) + collection["properties"]["status"] = "rejected" + collection["properties"]["errors"] = errors + self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() + return False + + update_from = None + if existing["items"]: + # "collection" is the newly uploaded one we're looking at + update_from = collection + collection = existing["items"][0] + collection["properties"] = update_from["properties"] + + if "errors" in collection["properties"]: + del collection["properties"]["errors"] + collection["properties"]["status"] = "validated" + collection["properties"]["sequence_label"] = sample_id + + if update_from: + with arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as update_existing_col: + update_existing_col.copy("metadata.yaml", "metadata.yaml", source_collection=col, overwrite=True) + update_existing_col.save(properties=collection["properties"]) + self.api.collections().delete(uuid=update_from["uuid"]).execute() + logging.info("Updated '%s' in validated sequences" % collection["name"]) + else: + # Move it to the "validated" project to be included in the next analysis + self.api.collections().update(uuid=collection["uuid"], body={ + "owner_uuid": self.validated_project, + "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), + "properties": collection["properties"]}).execute() + logging.info("Added '%s' to validated sequences" % collection["name"]) + + return True def run_workflow(self, parent_project, workflow_uuid, name, inputobj): diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py index ea0fa70..e2f089f 100644 --- a/bh20sequploader/main.py +++ b/bh20sequploader/main.py @@ -52,23 +52,23 @@ sequence for enough overlap with the reference genome failed = True # continue with the FASTA checker target = [] - try: - log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC") - target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc)) - if sequence_p2: - if target[0][2] == 'text/fasta': - raise ValueError("It is possible to upload just one FASTA file at a time") - target.append(qc_fasta(sequence_p2)) - - target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2]) - target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2]) - - if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]: - raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})") - - except Exception as e: - log.exception("Failed sequence QC") - failed = True + if sequence_p1: + try: + log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC") + target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc)) + if sequence_p2: + if target[0][2] == 'text/fasta': + raise ValueError("It is possible to upload just one FASTA file at a time") + target.append(qc_fasta(sequence_p2)) + + target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2]) + target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2]) + + if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]: + raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})") + except Exception as e: + log.exception("Failed sequence QC") + failed = True if failed: log.debug("Bailing out!") @@ -87,7 +87,7 @@ def upload_sequence(col, target, sequence): def main(): parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis') parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json') - parser.add_argument('sequence_p1', type=argparse.FileType('rb'), help='sequence FASTA/FASTQ') + parser.add_argument('sequence_p1', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTA/FASTQ') parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTQ pair') parser.add_argument("--validate", action="store_true", help="Dry run, validate only") parser.add_argument("--skip-qc", action="store_true", help="Skip local qc check") @@ -102,7 +102,10 @@ def main(): # ---- First the QC target = qc_stuff(args.metadata, args.sequence_p1, args.sequence_p2, not args.skip_qc) - seqlabel = target[0][1] + if target: + seqlabel = target[0][1] + else: + seqlabel = "" if args.validate: log.info("Valid") @@ -111,9 +114,10 @@ def main(): col = arvados.collection.Collection(api_client=api) # ---- Upload the sequence to Arvados - upload_sequence(col, target[0], args.sequence_p1) - if args.sequence_p2: - upload_sequence(col, target[1], args.sequence_p2) + if args.sequence_p1: + upload_sequence(col, target[0], args.sequence_p1) + if args.sequence_p2: + upload_sequence(col, target[1], args.sequence_p2) # ---- Make sure the metadata YAML is valid log.info("Reading metadata") -- cgit v1.2.3