aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPeter Amstutz2020-11-11 17:38:16 -0500
committerPeter Amstutz2020-11-11 17:38:47 -0500
commit2cb5faa2b088cf36c8c41633db137fc020a34529 (patch)
tree6a9f39513e020517f2bac609aaf2e15128411a70
parentc5c730cdeb9f2e9a02e30c2215dfe2b6ae787b07 (diff)
downloadbh20-seq-resource-2cb5faa2b088cf36c8c41633db137fc020a34529.tar.gz
bh20-seq-resource-2cb5faa2b088cf36c8c41633db137fc020a34529.tar.lz
bh20-seq-resource-2cb5faa2b088cf36c8c41633db137fc020a34529.zip
Support uploading new metadata only
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>
-rw-r--r--bh20seqanalyzer/main.py182
-rw-r--r--bh20sequploader/main.py48
2 files changed, 116 insertions, 114 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index b54a746..7176fe8 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -45,99 +45,97 @@ class SeqAnalyzer:
if not revalidate and collection["properties"].get("status") in ("validated", "rejected"):
return False
- col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient)
-
- # validate the collection here. Check metadata, etc.
- logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
-
- errors = []
-
- if collection["owner_uuid"] != self.validated_project:
- dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
- ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
- if dup["items"]:
- # This exact collection has been uploaded before.
- errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
-
- if not errors:
- if "metadata.yaml" not in col:
- errors.append("Missing metadata.yaml", collection["name"])
- else:
+ with arvados.collection.CollectionReader(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as col:
+ # validate the collection here. Check metadata, etc.
+ logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
+
+ errors = []
+
+ if collection["owner_uuid"] != self.validated_project:
+ dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
+ ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+ if dup["items"]:
+ # This exact collection has been uploaded before.
+ errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
+
+ if not errors:
+ if "metadata.yaml" not in col:
+ errors.append("%s missing metadata.yaml" % collection["name"])
+ else:
+ try:
+ with col.open("metadata.yaml") as md:
+ metadata_content = ruamel.yaml.round_trip_load(md)
+ metadata_content["id"] = "http://covid19.genenetwork.org/resource/%s" % collection["uuid"]
+ sample_id = metadata_content["sample"]["sample_id"]
+ add_lc_filename(metadata_content, metadata_content["id"])
+ valid = qc_metadata(metadata_content)
+ if not valid:
+ errors.append("Failed metadata qc")
+ except Exception as e:
+ errors.append(str(e))
+
+ existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
+ ["properties.sequence_label", "=", sample_id]]).execute()
+
+ if not errors:
try:
- with col.open("metadata.yaml") as md:
- metadata_content = ruamel.yaml.round_trip_load(md)
- metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
- sample_id = metadata_content["sample"]["sample_id"]
- add_lc_filename(metadata_content, metadata_content["id"])
- valid = qc_metadata(metadata_content)
- if not valid:
- errors.append("Failed metadata qc")
- except Exception as e:
- errors.append(str(e))
-
- if not errors:
- try:
- tgt = None
- paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
- for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- if n not in col:
- continue
- with col.open(n, 'rb') as qf:
- tgt, seqlabel, seq_type = qc_fasta(qf)
- if tgt != n and tgt != paired.get(n):
- errors.append("Expected %s but magic says it should be %s", n, tgt)
- elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- self.start_fastq_to_fasta(collection, n, sample_id)
- return False
-
- # If it is a FASTA
- if sample_id != seqlabel:
- errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel))
- if tgt is None:
- errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
- except Exception as v:
- errors.append(str(v))
-
- if errors:
- # It is invalid
- logging.warn("'%s' (%s) has validation errors: %s" % (
- collection["name"], collection["uuid"], "\n".join(errors)))
- collection["properties"]["status"] = "rejected"
- collection["properties"]["errors"] = errors
- self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
- return False
-
- existing = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
- ["properties.sequence_label", "=", sample_id]]).execute()
-
- update_from = None
- if existing["items"]:
- # "collection" is the newly uploaded one we're looking at
- update_from = collection
- collection = existing["items"][0]
- collection["properties"] = update_from["properties"]
-
- if "errors" in collection["properties"]:
- del collection["properties"]["errors"]
- collection["properties"]["status"] = "validated"
- collection["properties"]["sequence_label"] = sample_id
-
- if update_from:
- self.api.collections().update(uuid=collection["uuid"], body={
- "properties": collection["properties"],
- "manifest_text": col.manifest_text()
- }).execute()
- self.api.collections().delete(uuid=update_from["uuid"]).execute()
- logging.info("Updated '%s' in validated sequences" % collection["name"])
- else:
- # Move it to the "validated" project to be included in the next analysis
- self.api.collections().update(uuid=collection["uuid"], body={
- "owner_uuid": self.validated_project,
- "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
- "properties": collection["properties"]}).execute()
- logging.info("Added '%s' to validated sequences" % collection["name"])
-
- return True
+ tgt = None
+ paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+ for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ if n not in col:
+ continue
+ with col.open(n, 'rb') as qf:
+ tgt, seqlabel, seq_type = qc_fasta(qf)
+ if tgt != n and tgt != paired.get(n):
+ errors.append("Expected %s but magic says it should be %s" % (n, tgt))
+ elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ self.start_fastq_to_fasta(collection, n, sample_id)
+ return False
+
+ # If it is a FASTA
+ if sample_id != seqlabel:
+ errors.append("Expected sample_id == seqlabel, but %s != %s" % (sample_id, seqlabel))
+ if tgt is None and len(existing["items"]) == 0:
+ errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq" % collection["name"])
+ except Exception as v:
+ errors.append(str(v))
+
+ if errors:
+ # It is invalid
+ logging.warn("'%s' (%s) has validation errors: %s" % (
+ collection["name"], collection["uuid"], "\n".join(errors)))
+ collection["properties"]["status"] = "rejected"
+ collection["properties"]["errors"] = errors
+ self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
+ return False
+
+ update_from = None
+ if existing["items"]:
+ # "collection" is the newly uploaded one we're looking at
+ update_from = collection
+ collection = existing["items"][0]
+ collection["properties"] = update_from["properties"]
+
+ if "errors" in collection["properties"]:
+ del collection["properties"]["errors"]
+ collection["properties"]["status"] = "validated"
+ collection["properties"]["sequence_label"] = sample_id
+
+ if update_from:
+ with arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient) as update_existing_col:
+ update_existing_col.copy("metadata.yaml", "metadata.yaml", source_collection=col, overwrite=True)
+ update_existing_col.save(properties=collection["properties"])
+ self.api.collections().delete(uuid=update_from["uuid"]).execute()
+ logging.info("Updated '%s' in validated sequences" % collection["name"])
+ else:
+ # Move it to the "validated" project to be included in the next analysis
+ self.api.collections().update(uuid=collection["uuid"], body={
+ "owner_uuid": self.validated_project,
+ "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
+ "properties": collection["properties"]}).execute()
+ logging.info("Added '%s' to validated sequences" % collection["name"])
+
+ return True
def run_workflow(self, parent_project, workflow_uuid, name, inputobj):
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index ea0fa70..e2f089f 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -52,23 +52,23 @@ sequence for enough overlap with the reference genome
failed = True # continue with the FASTA checker
target = []
- try:
- log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC")
- target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc))
- if sequence_p2:
- if target[0][2] == 'text/fasta':
- raise ValueError("It is possible to upload just one FASTA file at a time")
- target.append(qc_fasta(sequence_p2))
-
- target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2])
- target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2])
-
- if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]:
- raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})")
-
- except Exception as e:
- log.exception("Failed sequence QC")
- failed = True
+ if sequence_p1:
+ try:
+ log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC")
+ target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc))
+ if sequence_p2:
+ if target[0][2] == 'text/fasta':
+ raise ValueError("It is possible to upload just one FASTA file at a time")
+ target.append(qc_fasta(sequence_p2))
+
+ target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2])
+ target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2])
+
+ if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]:
+ raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})")
+ except Exception as e:
+ log.exception("Failed sequence QC")
+ failed = True
if failed:
log.debug("Bailing out!")
@@ -87,7 +87,7 @@ def upload_sequence(col, target, sequence):
def main():
parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
- parser.add_argument('sequence_p1', type=argparse.FileType('rb'), help='sequence FASTA/FASTQ')
+ parser.add_argument('sequence_p1', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTA/FASTQ')
parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTQ pair')
parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
parser.add_argument("--skip-qc", action="store_true", help="Skip local qc check")
@@ -102,7 +102,10 @@ def main():
# ---- First the QC
target = qc_stuff(args.metadata, args.sequence_p1, args.sequence_p2, not args.skip_qc)
- seqlabel = target[0][1]
+ if target:
+ seqlabel = target[0][1]
+ else:
+ seqlabel = ""
if args.validate:
log.info("Valid")
@@ -111,9 +114,10 @@ def main():
col = arvados.collection.Collection(api_client=api)
# ---- Upload the sequence to Arvados
- upload_sequence(col, target[0], args.sequence_p1)
- if args.sequence_p2:
- upload_sequence(col, target[1], args.sequence_p2)
+ if args.sequence_p1:
+ upload_sequence(col, target[0], args.sequence_p1)
+ if args.sequence_p2:
+ upload_sequence(col, target[1], args.sequence_p2)
# ---- Make sure the metadata YAML is valid
log.info("Reading metadata")