aboutsummaryrefslogtreecommitdiff
path: root/bh20seqanalyzer/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'bh20seqanalyzer/main.py')
-rw-r--r--bh20seqanalyzer/main.py134
1 files changed, 95 insertions, 39 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 9a36cae..1746587 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -20,26 +20,36 @@ def validate_upload(api, collection, validated_project,
fastq_project, fastq_workflow_uuid):
col = arvados.collection.Collection(collection["uuid"])
+ if collection.get("status") in ("validated", "rejected"):
+ return False
+
# validate the collection here. Check metadata, etc.
- valid = True
+ logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
- if "metadata.yaml" not in col:
- logging.warn("Upload '%s' missing metadata.yaml", collection["name"])
- valid = False
- else:
- try:
- metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
- metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
- sample_id = metadata_content["sample"]["sample_id"]
- add_lc_filename(metadata_content, metadata_content["id"])
- valid = qc_metadata(metadata_content) and valid
- except Exception as e:
- logging.warn(e)
- valid = False
- if not valid:
- logging.warn("Failed metadata qc")
-
- if valid:
+ errors = []
+
+ dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
+ ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+ if dup["items"]:
+ # This exact collection has been uploaded before.
+ errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
+
+ if not errors:
+ if "metadata.yaml" not in col:
+ errors.append("Missing metadata.yaml", collection["name"])
+ else:
+ try:
+ metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
+ metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ sample_id = metadata_content["sample"]["sample_id"]
+ add_lc_filename(metadata_content, metadata_content["id"])
+ valid = qc_metadata(metadata_content)
+ if not valid:
+ errors.append("Failed metadata qc")
+ except Exception as e:
+ errors.append(str(e))
+
+ if not errors:
try:
tgt = None
paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
@@ -49,36 +59,32 @@ def validate_upload(api, collection, validated_project,
with col.open(n, 'rb') as qf:
tgt = qc_fasta(qf)[0]
if tgt != n and tgt != paired.get(n):
- logging.info("Expected %s but magic says it should be %s", n, tgt)
- valid = False
+ errors.append("Expected %s but magic says it should be %s", n, tgt)
elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id)
return False
if tgt is None:
- valid = False
- logging.warn("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
- except ValueError as v:
- valid = False
+ errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
+ except Exception as v:
+ errors.append(str(v))
- dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
- ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
- if dup["items"]:
- # This exact collection has been uploaded before.
- valid = False
- logging.warn("Upload '%s' is duplicate" % collection["name"])
- if valid:
+ if not errors:
logging.info("Added '%s' to validated sequences" % collection["name"])
# Move it to the "validated" project to be included in the next analysis
+ collection["properties"]["status"] = "validated"
api.collections().update(uuid=collection["uuid"], body={
"owner_uuid": validated_project,
"name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime()))}).execute()
+ return True
else:
- # It is invalid, delete it.
- logging.warn("Suggest deleting '%s'" % collection["name"])
- #api.collections().delete(uuid=collection["uuid"]).execute()
-
- return valid
+ # It is invalid
+ logging.warn("'%s' (%s) has validation errors: %s" % (
+ collection["name"], collection["uuid"], "\n".join(errors)))
+ collection["properties"]["status"] = "rejected"
+ collection["properties"]["errors"] = errors
+ api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
+ return False
def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
@@ -229,6 +235,50 @@ def upload_schema(api, workflow_def_project):
return "keep:%s/schema.yml" % pdh
+def print_status(api, uploader_project, fmt):
+ pending = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", uploader_project]])
+ out = []
+ status = {}
+ for p in pending:
+ prop = p["properties"]
+ out.append(prop)
+ if "status" not in prop:
+ prop["status"] = "pending"
+ prop["created_at"] = p["created_at"]
+ prop["uuid"] = p["uuid"]
+ status[prop["status"]] = status.get(prop["status"], 0) + 1
+ if fmt == "html":
+ print(
+"""
+<html>
+<body>
+""")
+ print("<p>Total collections in upload project %s</p>" % len(out))
+ print("<p>Status %s</p>" % status)
+ print(
+"""
+<table>
+<tr><th>Collection</th>
+<th>Sequence label</th>
+<th>Status</th>
+<th>Errors</th></tr>
+""")
+ for r in out:
+ print("<tr valign='top'>")
+ print("<td><a href='https://workbench.lugli.arvadosapi.com/collections/%s'>%s</a></td>" % (r["uuid"], r["uuid"]))
+ print("<td>%s</td>" % r["sequence_label"])
+ print("<td>%s</td>" % r["status"])
+ print("<td><pre>%s</pre></td>" % "\n".join(r.get("errors", [])))
+ print("</tr>")
+ print(
+"""
+</table>
+</body>
+</html>
+""")
+ else:
+ print(json.dumps(out, indent=2))
+
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
@@ -244,7 +294,9 @@ def main():
parser.add_argument('--latest-result-collection', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='')
parser.add_argument('--kickoff', action="store_true")
+ parser.add_argument('--no-start-analysis', action="store_true")
parser.add_argument('--once', action="store_true")
+ parser.add_argument('--print-status', type=str, default=None)
args = parser.parse_args()
api = arvados.api()
@@ -263,20 +315,24 @@ def main():
args.exclude_list)
return
+ if args.print_status:
+ print_status(api, args.uploader_project, args.print_status)
+ exit(0)
+
logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
while True:
move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project)
- new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute()
+ new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
at_least_one_new_valid_seq = False
- for c in new_collections["items"]:
+ for c in new_collections:
at_least_one_new_valid_seq = validate_upload(api, c,
args.validated_project,
args.fastq_project,
args.fastq_workflow_uuid) or at_least_one_new_valid_seq
- if at_least_one_new_valid_seq:
+ if at_least_one_new_valid_seq and not args.no_start_analysis:
start_pangenome_analysis(api,
args.pangenome_analysis_project,
args.pangenome_workflow_uuid,