From 2cb5faa2b088cf36c8c41633db137fc020a34529 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 11 Nov 2020 17:38:16 -0500 Subject: Support uploading new metadata only Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- bh20seqanalyzer/main.py | 182 ++++++++++++++++++++++++------------------------ 1 file changed, 90 insertions(+), 92 deletions(-) (limited to 'bh20seqanalyzer') diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index b54a746..7176fe8 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -45,99 +45,97 @@ class SeqAnalyzer: if not revalidate and collection["properties"].get("status") in ("validated", "rejected"): return False - col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) - - # validate the collection here. Check metadata, etc. - logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) - - errors = [] - - if collection["owner_uuid"] != self.validated_project: - dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], - ["portable_data_hash", "=", col.portable_data_hash()]]).execute() - if dup["items"]: - # This exact collection has been uploaded before. - errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) - - if not errors: - if "metadata.yaml" not in col: - errors.append("Missing metadata.yaml", collection["name"]) - else: + with arvados.collection.CollectionReader(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as col: + # validate the collection here. Check metadata, etc. + logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) + + errors = [] + + if collection["owner_uuid"] != self.validated_project: + dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], + ["portable_data_hash", "=", col.portable_data_hash()]]).execute() + if dup["items"]: + # This exact collection has been uploaded before. + errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) + + if not errors: + if "metadata.yaml" not in col: + errors.append("%s missing metadata.yaml" % collection["name"]) + else: + try: + with col.open("metadata.yaml") as md: + metadata_content = ruamel.yaml.round_trip_load(md) + metadata_content["id"] = "http://covid19.genenetwork.org/resource/%s" % collection["uuid"] + sample_id = metadata_content["sample"]["sample_id"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) + if not valid: + errors.append("Failed metadata qc") + except Exception as e: + errors.append(str(e)) + + existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], + ["properties.sequence_label", "=", sample_id]]).execute() + + if not errors: try: - with col.open("metadata.yaml") as md: - metadata_content = ruamel.yaml.round_trip_load(md) - metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] - sample_id = metadata_content["sample"]["sample_id"] - add_lc_filename(metadata_content, metadata_content["id"]) - valid = qc_metadata(metadata_content) - if not valid: - errors.append("Failed metadata qc") - except Exception as e: - errors.append(str(e)) - - if not errors: - try: - tgt = None - paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} - for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - if n not in col: - continue - with col.open(n, 'rb') as qf: - tgt, seqlabel, seq_type = qc_fasta(qf) - if tgt != n and tgt != paired.get(n): - errors.append("Expected %s but magic says it should be %s", n, tgt) - elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): - self.start_fastq_to_fasta(collection, n, sample_id) - return False - - # If it is a FASTA - if sample_id != seqlabel: - errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel)) - if tgt is None: - errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) - except Exception as v: - errors.append(str(v)) - - if errors: - # It is invalid - logging.warn("'%s' (%s) has validation errors: %s" % ( - collection["name"], collection["uuid"], "\n".join(errors))) - collection["properties"]["status"] = "rejected" - collection["properties"]["errors"] = errors - self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() - return False - - existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project], - ["properties.sequence_label", "=", sample_id]]).execute() - - update_from = None - if existing["items"]: - # "collection" is the newly uploaded one we're looking at - update_from = collection - collection = existing["items"][0] - collection["properties"] = update_from["properties"] - - if "errors" in collection["properties"]: - del collection["properties"]["errors"] - collection["properties"]["status"] = "validated" - collection["properties"]["sequence_label"] = sample_id - - if update_from: - self.api.collections().update(uuid=collection["uuid"], body={ - "properties": collection["properties"], - "manifest_text": col.manifest_text() - }).execute() - self.api.collections().delete(uuid=update_from["uuid"]).execute() - logging.info("Updated '%s' in validated sequences" % collection["name"]) - else: - # Move it to the "validated" project to be included in the next analysis - self.api.collections().update(uuid=collection["uuid"], body={ - "owner_uuid": self.validated_project, - "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), - "properties": collection["properties"]}).execute() - logging.info("Added '%s' to validated sequences" % collection["name"]) - - return True + tgt = None + paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} + for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + if n not in col: + continue + with col.open(n, 'rb') as qf: + tgt, seqlabel, seq_type = qc_fasta(qf) + if tgt != n and tgt != paired.get(n): + errors.append("Expected %s but magic says it should be %s" % (n, tgt)) + elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): + self.start_fastq_to_fasta(collection, n, sample_id) + return False + + # If it is a FASTA + if sample_id != seqlabel: + errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel)) + if tgt is None and len(existing["items"]) == 0: + errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq" % collection["name"]) + except Exception as v: + errors.append(str(v)) + + if errors: + # It is invalid + logging.warn("'%s' (%s) has validation errors: %s" % ( + collection["name"], collection["uuid"], "\n".join(errors))) + collection["properties"]["status"] = "rejected" + collection["properties"]["errors"] = errors + self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() + return False + + update_from = None + if existing["items"]: + # "collection" is the newly uploaded one we're looking at + update_from = collection + collection = existing["items"][0] + collection["properties"] = update_from["properties"] + + if "errors" in collection["properties"]: + del collection["properties"]["errors"] + collection["properties"]["status"] = "validated" + collection["properties"]["sequence_label"] = sample_id + + if update_from: + with arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as update_existing_col: + update_existing_col.copy("metadata.yaml", "metadata.yaml", source_collection=col, overwrite=True) + update_existing_col.save(properties=collection["properties"]) + self.api.collections().delete(uuid=update_from["uuid"]).execute() + logging.info("Updated '%s' in validated sequences" % collection["name"]) + else: + # Move it to the "validated" project to be included in the next analysis + self.api.collections().update(uuid=collection["uuid"], body={ + "owner_uuid": self.validated_project, + "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())), + "properties": collection["properties"]}).execute() + logging.info("Added '%s' to validated sequences" % collection["name"]) + + return True def run_workflow(self, parent_project, workflow_uuid, name, inputobj): -- cgit v1.2.3