diff options
Diffstat (limited to 'bh20seqanalyzer')
-rw-r--r-- | bh20seqanalyzer/main.py | 134 |
1 files changed, 95 insertions, 39 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py index 9a36cae..1746587 100644 --- a/bh20seqanalyzer/main.py +++ b/bh20seqanalyzer/main.py @@ -20,26 +20,36 @@ def validate_upload(api, collection, validated_project, fastq_project, fastq_workflow_uuid): col = arvados.collection.Collection(collection["uuid"]) + if collection.get("status") in ("validated", "rejected"): + return False + # validate the collection here. Check metadata, etc. - valid = True + logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"])) - if "metadata.yaml" not in col: - logging.warn("Upload '%s' missing metadata.yaml", collection["name"]) - valid = False - else: - try: - metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) - metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] - sample_id = metadata_content["sample"]["sample_id"] - add_lc_filename(metadata_content, metadata_content["id"]) - valid = qc_metadata(metadata_content) and valid - except Exception as e: - logging.warn(e) - valid = False - if not valid: - logging.warn("Failed metadata qc") - - if valid: + errors = [] + + dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], + ["portable_data_hash", "=", col.portable_data_hash()]]).execute() + if dup["items"]: + # This exact collection has been uploaded before. + errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]])) + + if not errors: + if "metadata.yaml" not in col: + errors.append("Missing metadata.yaml", collection["name"]) + else: + try: + metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml")) + metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"] + sample_id = metadata_content["sample"]["sample_id"] + add_lc_filename(metadata_content, metadata_content["id"]) + valid = qc_metadata(metadata_content) + if not valid: + errors.append("Failed metadata qc") + except Exception as e: + errors.append(str(e)) + + if not errors: try: tgt = None paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"} @@ -49,36 +59,32 @@ def validate_upload(api, collection, validated_project, with col.open(n, 'rb') as qf: tgt = qc_fasta(qf)[0] if tgt != n and tgt != paired.get(n): - logging.info("Expected %s but magic says it should be %s", n, tgt) - valid = False + errors.append("Expected %s but magic says it should be %s", n, tgt) elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"): start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id) return False if tgt is None: - valid = False - logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) - except ValueError as v: - valid = False + errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"]) + except Exception as v: + errors.append(str(v)) - dup = api.collections().list(filters=[["owner_uuid", "=", validated_project], - ["portable_data_hash", "=", col.portable_data_hash()]]).execute() - if dup["items"]: - # This exact collection has been uploaded before. - valid = False - logging.warn("Upload '%s' is duplicate" % collection["name"]) - if valid: + if not errors: logging.info("Added '%s' to validated sequences" % collection["name"]) # Move it to the "validated" project to be included in the next analysis + collection["properties"]["status"] = "validated" api.collections().update(uuid=collection["uuid"], body={ "owner_uuid": validated_project, "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute() + return True else: - # It is invalid, delete it. - logging.warn("Suggest deleting '%s'" % collection["name"]) - #api.collections().delete(uuid=collection["uuid"]).execute() - - return valid + # It is invalid + logging.warn("'%s' (%s) has validation errors: %s" % ( + collection["name"], collection["uuid"], "\n".join(errors))) + collection["properties"]["status"] = "rejected" + collection["properties"]["errors"] = errors + api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute() + return False def run_workflow(api, parent_project, workflow_uuid, name, inputobj): @@ -229,6 +235,50 @@ def upload_schema(api, workflow_def_project): return "keep:%s/schema.yml" % pdh +def print_status(api, uploader_project, fmt): + pending = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", uploader_project]]) + out = [] + status = {} + for p in pending: + prop = p["properties"] + out.append(prop) + if "status" not in prop: + prop["status"] = "pending" + prop["created_at"] = p["created_at"] + prop["uuid"] = p["uuid"] + status[prop["status"]] = status.get(prop["status"], 0) + 1 + if fmt == "html": + print( +""" +<html> +<body> +""") + print("<p>Total collections in upload project %s</p>" % len(out)) + print("<p>Status %s</p>" % status) + print( +""" +<table> +<tr><th>Collection</th> +<th>Sequence label</th> +<th>Status</th> +<th>Errors</th></tr> +""") + for r in out: + print("<tr valign='top'>") + print("<td><a href='https://workbench.lugli.arvadosapi.com/collections/%s'>%s</a></td>" % (r["uuid"], r["uuid"])) + print("<td>%s</td>" % r["sequence_label"]) + print("<td>%s</td>" % r["status"]) + print("<td><pre>%s</pre></td>" % "\n".join(r.get("errors", []))) + print("</tr>") + print( +""" +</table> +</body> +</html> +""") + else: + print(json.dumps(out, indent=2)) + def main(): parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project') parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='') @@ -244,7 +294,9 @@ def main(): parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='') parser.add_argument('--kickoff', action="store_true") + parser.add_argument('--no-start-analysis', action="store_true") parser.add_argument('--once', action="store_true") + parser.add_argument('--print-status', type=str, default=None) args = parser.parse_args() api = arvados.api() @@ -263,20 +315,24 @@ def main(): args.exclude_list) return + if args.print_status: + print_status(api, args.uploader_project, args.print_status) + exit(0) + logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project)) while True: move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project) - new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute() + new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]]) at_least_one_new_valid_seq = False - for c in new_collections["items"]: + for c in new_collections: at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project, args.fastq_project, args.fastq_workflow_uuid) or at_least_one_new_valid_seq - if at_least_one_new_valid_seq: + if at_least_one_new_valid_seq and not args.no_start_analysis: start_pangenome_analysis(api, args.pangenome_analysis_project, args.pangenome_workflow_uuid, |