From 53ff8af0843942d83dff9fd5b95d1ae98e80fe27 Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 11:48:35 -0400
Subject: Refactor analysis code into a class
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20seqanalyzer/main.py | 597 ++++++++++++++++++++++--------------------
bh20simplewebuploader/main.py | 28 +-
2 files changed, 326 insertions(+), 299 deletions(-)
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 0b52e6b..f2bb234 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -16,277 +16,306 @@ logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%
level=logging.INFO)
logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN)
-def validate_upload(api, collection, validated_project,
- fastq_project, fastq_workflow_uuid,
- revalidate):
- col = arvados.collection.Collection(collection["uuid"])
-
- if not revalidate and collection["properties"].get("status") in ("validated", "rejected"):
- return False
-
- # validate the collection here. Check metadata, etc.
- logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
-
- errors = []
-
- if collection["owner_uuid"] != validated_project:
- dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
- ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
- if dup["items"]:
- # This exact collection has been uploaded before.
- errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
-
- if not errors:
- if "metadata.yaml" not in col:
- errors.append("Missing metadata.yaml", collection["name"])
- else:
+class SeqAnalyzer:
+
+ def __init__(self, api, keepclient,
+ uploader_project,
+ pangenome_analysis_project,
+ fastq_project,
+ validated_project,
+ workflow_def_project,
+ pangenome_workflow_uuid,
+ fastq_workflow_uuid,
+ exclude_list,
+ latest_result_collection):
+ self.api = api
+ self.keepclient = keepclient
+ self.uploader_project = uploader_project
+ self.pangenome_analysis_project = pangenome_analysis_project
+ self.fastq_project = fastq_project
+ self.validated_project = validated_project
+ self.workflow_def_project = workflow_def_project
+ self.pangenome_workflow_uuid = pangenome_workflow_uuid
+ self.fastq_workflow_uuid = fastq_workflow_uuid
+ self.exclude_list = exclude_list
+ self.latest_result_uuid = latest_result_collection
+ self.schema_ref = None
+
+ def validate_upload(self, collection, revalidate):
+ col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient)
+
+ if not revalidate and collection["properties"].get("status") in ("validated", "rejected"):
+ return False
+
+ # validate the collection here. Check metadata, etc.
+ logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
+
+ errors = []
+
+ if collection["owner_uuid"] != self.validated_project:
+ dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
+ ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+ if dup["items"]:
+ # This exact collection has been uploaded before.
+ errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
+
+ if not errors:
+ if "metadata.yaml" not in col:
+ errors.append("Missing metadata.yaml", collection["name"])
+ else:
+ try:
+ with col.open("metadata.yaml") as md:
+ metadata_content = ruamel.yaml.round_trip_load(md)
+ metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ sample_id = metadata_content["sample"]["sample_id"]
+ add_lc_filename(metadata_content, metadata_content["id"])
+ valid = qc_metadata(metadata_content)
+ if not valid:
+ errors.append("Failed metadata qc")
+ except Exception as e:
+ errors.append(str(e))
+
+ if not errors:
try:
- metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
- metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
- sample_id = metadata_content["sample"]["sample_id"]
- add_lc_filename(metadata_content, metadata_content["id"])
- valid = qc_metadata(metadata_content)
- if not valid:
- errors.append("Failed metadata qc")
- except Exception as e:
- errors.append(str(e))
-
- if not errors:
- try:
- tgt = None
- paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
- for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- if n not in col:
- continue
- with col.open(n, 'rb') as qf:
- tgt = qc_fasta(qf)[0]
- if tgt != n and tgt != paired.get(n):
- errors.append("Expected %s but magic says it should be %s", n, tgt)
- elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id)
- return False
- if tgt is None:
- errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
- except Exception as v:
- errors.append(str(v))
-
-
- if not errors:
- # Move it to the "validated" project to be included in the next analysis
- if "errors" in collection["properties"]:
- del collection["properties"]["errors"]
- collection["properties"]["status"] = "validated"
- api.collections().update(uuid=collection["uuid"], body={
- "owner_uuid": validated_project,
- "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
- "properties": collection["properties"]}).execute()
- logging.info("Added '%s' to validated sequences" % collection["name"])
- return True
- else:
- # It is invalid
- logging.warn("'%s' (%s) has validation errors: %s" % (
- collection["name"], collection["uuid"], "\n".join(errors)))
- collection["properties"]["status"] = "rejected"
- collection["properties"]["errors"] = errors
- api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
- return False
-
-
-def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
- project = api.groups().create(body={
- "group_class": "project",
- "name": name,
- "owner_uuid": parent_project,
- }, ensure_unique_name=True).execute()
-
- with tempfile.NamedTemporaryFile() as tmp:
- tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
- tmp.flush()
- cmd = ["arvados-cwl-runner",
- "--submit",
- "--no-wait",
- "--project-uuid=%s" % project["uuid"],
- "arvwf:%s" % workflow_uuid,
- tmp.name]
- logging.info("Running %s" % ' '.join(cmd))
- comp = subprocess.run(cmd, capture_output=True)
- logging.info("Submitted %s", comp.stdout)
- if comp.returncode != 0:
- logging.error(comp.stderr.decode('utf-8'))
-
- return project
-
-
-def start_fastq_to_fasta(api, collection,
- analysis_project,
- fastq_workflow_uuid,
- tgt,
- sample_id):
-
- params = {
- "metadata": {
- "class": "File",
- "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
- },
- "ref_fasta": {
- "class": "File",
- "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
- },
- "sample_id": sample_id
- }
-
- if tgt.startswith("reads.fastq"):
- params["fastq_forward"] = {
- "class": "File",
- "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
- }
- elif tgt.startswith("reads_1.fastq"):
- params["fastq_forward"] = {
- "class": "File",
- "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
- }
- params["fastq_reverse"] = {
- "class": "File",
- "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ tgt = None
+ paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+ for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ if n not in col:
+ continue
+ with col.open(n, 'rb') as qf:
+ tgt = qc_fasta(qf)[0]
+ if tgt != n and tgt != paired.get(n):
+ errors.append("Expected %s but magic says it should be %s", n, tgt)
+ elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ self.start_fastq_to_fasta(collection, n, sample_id)
+ return False
+ if tgt is None:
+ errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
+ except Exception as v:
+ errors.append(str(v))
+
+
+ if not errors:
+ # Move it to the "validated" project to be included in the next analysis
+ if "errors" in collection["properties"]:
+ del collection["properties"]["errors"]
+ collection["properties"]["status"] = "validated"
+ self.api.collections().update(uuid=collection["uuid"], body={
+ "owner_uuid": self.validated_project,
+ "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
+ "properties": collection["properties"]}).execute()
+ logging.info("Added '%s' to validated sequences" % collection["name"])
+ return True
+ else:
+ # It is invalid
+ logging.warn("'%s' (%s) has validation errors: %s" % (
+ collection["name"], collection["uuid"], "\n".join(errors)))
+ collection["properties"]["status"] = "rejected"
+ collection["properties"]["errors"] = errors
+ self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
+ return False
+
+
+ def run_workflow(self, parent_project, workflow_uuid, name, inputobj):
+ project = self.api.groups().create(body={
+ "group_class": "project",
+ "name": name,
+ "owner_uuid": parent_project,
+ }, ensure_unique_name=True).execute()
+
+ with tempfile.NamedTemporaryFile() as tmp:
+ tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
+ tmp.flush()
+ cmd = ["arvados-cwl-runner",
+ "--submit",
+ "--no-wait",
+ "--project-uuid=%s" % project["uuid"],
+ "arvwf:%s" % workflow_uuid,
+ tmp.name]
+ logging.info("Running %s" % ' '.join(cmd))
+ comp = subprocess.run(cmd, capture_output=True)
+ logging.info("Submitted %s", comp.stdout)
+ if comp.returncode != 0:
+ logging.error(comp.stderr.decode('utf-8'))
+
+ return project
+
+
+ def start_fastq_to_fasta(self, collection,
+ tgt,
+ sample_id):
+
+ params = {
+ "metadata": {
+ "class": "File",
+ "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ },
+ "ref_fasta": {
+ "class": "File",
+ "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
+ },
+ "sample_id": sample_id
}
- newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params)
- api.collections().update(uuid=collection["uuid"],
- body={"owner_uuid": newproject["uuid"]}).execute()
-
-def start_pangenome_analysis(api,
- analysis_project,
- pangenome_workflow_uuid,
- validated_project,
- schema_ref,
- exclude_list):
- validated = arvados.util.list_all(api.collections().list, filters=[
- ["owner_uuid", "=", validated_project],
- ["properties.status", "=", "validated"]])
- inputobj = {
- "inputReads": [],
- "metadata": [],
- "subjects": [],
- "metadataSchema": {
- "class": "File",
- "location": schema_ref
- },
- "exclude": {
- "class": "File",
- "location": exclude_list
+ if tgt.startswith("reads.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
+ }
+ elif tgt.startswith("reads_1.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+ params["fastq_reverse"] = {
+ "class": "File",
+ "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+
+ newproject = self.run_workflow(self.fastq_project, self.fastq_workflow_uuid, "FASTQ to FASTA", params)
+ self.api.collections().update(uuid=collection["uuid"],
+ body={"owner_uuid": newproject["uuid"]}).execute()
+
+ def start_pangenome_analysis(self):
+
+ if self.schema_ref is None:
+ self.upload_schema()
+
+ validated = arvados.util.list_all(self.api.collections().list, filters=[
+ ["owner_uuid", "=", self.validated_project],
+ ["properties.status", "=", "validated"]])
+ inputobj = {
+ "inputReads": [],
+ "metadata": [],
+ "subjects": [],
+ "metadataSchema": {
+ "class": "File",
+ "location": self.schema_ref
+ },
+ "exclude": {
+ "class": "File",
+ "location": self.exclude_list
+ }
}
- }
- validated.sort(key=lambda v: v["portable_data_hash"])
- for v in validated:
- inputobj["inputReads"].append({
- "class": "File",
- "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
- })
- inputobj["metadata"].append({
- "class": "File",
- "location": "keep:%s/metadata.yaml" % v["portable_data_hash"]
- })
- inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"])
- run_workflow(api, analysis_project, pangenome_workflow_uuid, "Pangenome analysis", inputobj)
-
-
-def get_workflow_output_from_project(api, uuid):
- cr = api.container_requests().list(filters=[['owner_uuid', '=', uuid],
- ["requesting_container_uuid", "=", None]]).execute()
- if cr["items"] and cr["items"][0]["output_uuid"]:
- container = api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute()
- if container["state"] == "Complete" and container["exit_code"] == 0:
- return cr["items"][0]
- return None
-
-
-def copy_most_recent_result(api, analysis_project, latest_result_uuid):
- most_recent_analysis = api.groups().list(filters=[['owner_uuid', '=', analysis_project]],
- order="created_at desc").execute()
- for m in most_recent_analysis["items"]:
- wf = get_workflow_output_from_project(api, m["uuid"])
- if wf:
- src = api.collections().get(uuid=wf["output_uuid"]).execute()
- dst = api.collections().get(uuid=latest_result_uuid).execute()
- if src["portable_data_hash"] != dst["portable_data_hash"]:
- logging.info("Copying latest result from '%s' to %s", m["name"], latest_result_uuid)
- api.collections().update(uuid=latest_result_uuid,
- body={"manifest_text": src["manifest_text"],
- "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute()
- break
-
+ validated.sort(key=lambda v: v["portable_data_hash"])
+ for v in validated:
+ inputobj["inputReads"].append({
+ "class": "File",
+ "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
+ })
+ inputobj["metadata"].append({
+ "class": "File",
+ "location": "keep:%s/metadata.yaml" % v["portable_data_hash"]
+ })
+ inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"])
+ self.run_workflow(self.pangenome_analysis_project, self.pangenome_workflow_uuid, "Pangenome analysis", inputobj)
+
+
+ def get_workflow_output_from_project(self, uuid):
+ cr = self.api.container_requests().list(filters=[['owner_uuid', '=', uuid],
+ ["requesting_container_uuid", "=", None]]).execute()
+ if cr["items"] and cr["items"][0]["output_uuid"]:
+ container = self.api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute()
+ if container["state"] == "Complete" and container["exit_code"] == 0:
+ return cr["items"][0]
+ return None
+
+
+ def copy_most_recent_result(self):
+ most_recent_analysis = self.api.groups().list(filters=[['owner_uuid', '=', self.pangenome_analysis_project]],
+ order="created_at desc").execute()
+ for m in most_recent_analysis["items"]:
+ wf = self.get_workflow_output_from_project(m["uuid"])
+ if wf:
+ src = self.api.collections().get(uuid=wf["output_uuid"]).execute()
+ dst = self.api.collections().get(uuid=self.latest_result_uuid).execute()
+ if src["portable_data_hash"] != dst["portable_data_hash"]:
+ logging.info("Copying latest result from '%s' to %s", m["name"], self.latest_result_uuid)
+ self.api.collections().update(uuid=self.latest_result_uuid,
+ body={"manifest_text": src["manifest_text"],
+ "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute()
+ break
+
+
+ def move_fastq_to_fasta_results(self):
+ projects = self.api.groups().list(filters=[['owner_uuid', '=', self.fastq_project],
+ ["properties.moved_output", "!=", True]],
+ order="created_at desc",).execute()
+ for p in projects["items"]:
+ wf = self.get_workflow_output_from_project(p["uuid"])
+ if not wf:
+ continue
-def move_fastq_to_fasta_results(api, analysis_project, uploader_project):
- projects = api.groups().list(filters=[['owner_uuid', '=', analysis_project],
- ["properties.moved_output", "!=", True]],
- order="created_at desc",).execute()
- for p in projects["items"]:
- wf = get_workflow_output_from_project(api, p["uuid"])
- if wf:
logging.info("Moving completed fastq2fasta result %s back to uploader project", wf["output_uuid"])
- api.collections().update(uuid=wf["output_uuid"],
- body={"owner_uuid": uploader_project}).execute()
+ self.api.collections().update(uuid=wf["output_uuid"],
+ body={"owner_uuid": self.uploader_project}).execute()
+
+ col = arvados.collection.Collection(wf["output_uuid"], api_client=self.api, keep_client=self.keepclient)
+ with col.open("metadata.yaml") as md:
+ metadata_content = ruamel.yaml.round_trip_load(md)
+
p["properties"]["moved_output"] = True
- api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute()
+ p["properties"]["sequence_label"] = metadata_content["sample"]["sample_id"]
+ self.api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute()
break
-def upload_schema(api, workflow_def_project):
- schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml")
- c = arvados.collection.Collection()
- with c.open("schema.yml", "wb") as f:
- f.write(schema_resource.read())
- pdh = c.portable_data_hash()
- wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project],
- ["portable_data_hash", "=", pdh]]).execute()
- if len(wd["items"]) == 0:
- c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True)
- return "keep:%s/schema.yml" % pdh
-
-
-def print_status(api, uploader_project, fmt):
- pending = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", uploader_project]])
- out = []
- status = {}
- for p in pending:
- prop = p["properties"]
- out.append(prop)
- if "status" not in prop:
- prop["status"] = "pending"
- prop["created_at"] = p["created_at"]
- prop["uuid"] = p["uuid"]
- status[prop["status"]] = status.get(prop["status"], 0) + 1
- if fmt == "html":
- print(
-"""
-
-
-""")
- print("Total collections in upload project %s
" % len(out))
- print("Status %s
" % status)
- print(
-"""
-
-Collection |
-Sequence label |
-Status |
-Errors |
-""")
- for r in out:
- print("")
- print("%s | " % (r["uuid"], r["uuid"]))
- print("%s | " % r["sequence_label"])
- print("%s | " % r["status"])
- print("%s | " % "\n".join(r.get("errors", [])))
- print("
")
- print(
-"""
-
-
-
-""")
- else:
- print(json.dumps(out, indent=2))
+ def upload_schema(self):
+ schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml")
+ c = arvados.collection.Collection(api_client=self.api, keep_client=self.keepclient)
+ with c.open("schema.yml", "wb") as f:
+ f.write(schema_resource.read())
+ pdh = c.portable_data_hash()
+ wd = self.api.collections().list(filters=[["owner_uuid", "=", self.workflow_def_project],
+ ["portable_data_hash", "=", pdh]]).execute()
+ if len(wd["items"]) == 0:
+ c.save_new(owner_uuid=self.workflow_def_project, name="Metadata schema", ensure_unique_name=True)
+ self.schema_ref = "keep:%s/schema.yml" % pdh
+
+
+ def print_status(self, fmt):
+ pending = arvados.util.list_all(self.api.collections().list, filters=[["owner_uuid", "=", self.uploader_project]])
+ out = []
+ status = {}
+ for p in pending:
+ prop = p["properties"]
+ out.append(prop)
+ if "status" not in prop:
+ prop["status"] = "pending"
+ prop["created_at"] = p["created_at"]
+ prop["uuid"] = p["uuid"]
+ status[prop["status"]] = status.get(prop["status"], 0) + 1
+ if fmt == "html":
+ print(
+ """
+
+
+ """)
+ print("Total collections in upload project %s
" % len(out))
+ print("Status %s
" % status)
+ print(
+ """
+
+ Collection |
+ Sequence label |
+ Status |
+ Errors |
+ """)
+ for r in out:
+ print("")
+ print("%s | " % (r["uuid"], r["uuid"]))
+ print("%s | " % r["sequence_label"])
+ print("%s | " % r["status"])
+ print("%s | " % "\n".join(r.get("errors", [])))
+ print("
")
+ print(
+ """
+
+
+
+ """)
+ else:
+ print(json.dumps(out, indent=2))
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
@@ -310,50 +339,42 @@ def main():
args = parser.parse_args()
api = arvados.api()
-
-
-
- schema_ref = upload_schema(api, args.workflow_def_project)
+ keepclient = arvados.keep.KeepClient(api_client=api)
+
+ seqanalyzer = SeqAnalyzer(api, keepclient,
+ args.uploader_project,
+ args.pangenome_analysis_project,
+ args.fastq_project,
+ args.validated_project,
+ args.workflow_def_project,
+ args.pangenome_workflow_uuid,
+ args.fastq_workflow_uuid,
+ args.exclude_list,
+ args.latest_result_collection)
if args.kickoff:
logging.info("Starting a single analysis run")
- start_pangenome_analysis(api,
- args.pangenome_analysis_project,
- args.pangenome_workflow_uuid,
- args.validated_project,
- schema_ref,
- args.exclude_list)
+ seqanalyzer.start_pangenome_analysis()
return
if args.print_status:
- print_status(api, args.uploader_project, args.print_status)
+ seqanalyzer.print_status(args.print_status)
exit(0)
logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
while True:
- move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project)
+ seqanalyzer.move_fastq_to_fasta_results()
new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
at_least_one_new_valid_seq = False
for c in new_collections:
- at_least_one_new_valid_seq = validate_upload(api, c,
- args.validated_project,
- args.fastq_project,
- args.fastq_workflow_uuid,
- args.revalidate) or at_least_one_new_valid_seq
+ at_least_one_new_valid_seq = seqanalyzer.validate_upload(c, args.revalidate) or at_least_one_new_valid_seq
if at_least_one_new_valid_seq and not args.no_start_analysis:
- start_pangenome_analysis(api,
- args.pangenome_analysis_project,
- args.pangenome_workflow_uuid,
- args.validated_project,
- schema_ref,
- args.exclude_list)
-
- copy_most_recent_result(api,
- args.pangenome_analysis_project,
- args.latest_result_collection)
+ seqanalyzer.start_pangenome_analysis()
+
+ seqanalyzer.copy_most_recent_result()
if args.once:
break
diff --git a/bh20simplewebuploader/main.py b/bh20simplewebuploader/main.py
index 8089883..3173d60 100644
--- a/bh20simplewebuploader/main.py
+++ b/bh20simplewebuploader/main.py
@@ -479,10 +479,13 @@ def pending_table(output, items):
for r in items:
if r["status"] != "pending":
continue
- output.write("")
- output.write("%s | " % (r["uuid"], r["uuid"]))
- output.write("%s | " % Markup.escape(r["sequence_label"]))
- output.write("
")
+ try:
+ output.write("")
+ output.write("%s | " % (r["uuid"], r["uuid"]))
+ output.write("%s | " % Markup.escape(r.get("sequence_label")))
+ output.write("
")
+ except:
+ pass
output.write(
"""
@@ -497,13 +500,16 @@ def rejected_table(output, items):
Errors |
""")
for r in items:
- if r["status"] != "rejected":
- continue
- output.write("")
- output.write("%s | " % (r["uuid"], r["uuid"]))
- output.write("%s | " % Markup.escape(r["sequence_label"]))
- output.write("%s | " % Markup.escape("\n".join(r.get("errors", []))))
- output.write("
")
+ try:
+ if r["status"] != "rejected":
+ continue
+ output.write("")
+ output.write("%s | " % (r["uuid"], r["uuid"]))
+ output.write("%s | " % Markup.escape(r.get("sequence_label")))
+ output.write("%s | " % Markup.escape("\n".join(r.get("errors", []))))
+ output.write("
")
+ except:
+ pass
output.write(
"""
--
cgit v1.2.3
From a10569f51072569604b2384e6e4d583b36de73c4 Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 12:01:38 -0400
Subject: Make license optional for now
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20sequploader/bh20seq-schema.yml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/bh20sequploader/bh20seq-schema.yml b/bh20sequploader/bh20seq-schema.yml
index ee852fa..0aead3b 100644
--- a/bh20sequploader/bh20seq-schema.yml
+++ b/bh20sequploader/bh20seq-schema.yml
@@ -16,7 +16,7 @@ $graph:
fields:
license_type:
doc: License types as defined in https://wiki.creativecommons.org/images/d/d6/Ccrel-1.0.pdf
- type: string?
+ type: string
jsonldPredicate:
_id: https://creativecommons.org/ns#License
title:
@@ -264,7 +264,7 @@ $graph:
virus: virusSchema
technology: technologySchema
submitter: submitterSchema
- license: licenseSchema
+ license: ["null", licenseSchema]
id:
doc: The subject (eg the fasta/fastq file) that the metadata describes
type: string
--
cgit v1.2.3
From 0e84b18cb134855d572d1f94d5d3c43571afe7e9 Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 12:04:26 -0400
Subject: Make license optional
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20sequploader/bh20seq-shex.rdf | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/bh20sequploader/bh20seq-shex.rdf b/bh20sequploader/bh20seq-shex.rdf
index 7331e86..bbc7309 100644
--- a/bh20sequploader/bh20seq-shex.rdf
+++ b/bh20sequploader/bh20seq-shex.rdf
@@ -17,7 +17,7 @@ PREFIX wikidata:
MainSchema:submitter @:submitterShape ;
MainSchema:technology @:technologyShape ;
MainSchema:virus @:virusShape;
- MainSchema:license @:licenseShape;
+ MainSchema:license @:licenseShape ?;
}
:hostShape {
@@ -71,7 +71,7 @@ PREFIX wikidata:
}
:licenseShape{
- cc:License xsd:string ?;
+ cc:License xsd:string ;
dc:Title xsd:string ?;
cc:attributionName xsd:string ?;
cc:attributionURL xsd:string ?;
--
cgit v1.2.3
From 6bfefe984a84fb215d61e045c49a4ab123bb7339 Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 12:32:43 -0400
Subject: Catch exceptions
Add script to cleanup bad uploads.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20seqanalyzer/main.py | 19 +++++++++++--------
scripts/cleanup.py | 20 ++++++++++++++++++++
2 files changed, 31 insertions(+), 8 deletions(-)
create mode 100644 scripts/cleanup.py
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index f2bb234..f18a93a 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -364,17 +364,20 @@ def main():
logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
while True:
- seqanalyzer.move_fastq_to_fasta_results()
+ try:
+ seqanalyzer.move_fastq_to_fasta_results()
- new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
- at_least_one_new_valid_seq = False
- for c in new_collections:
- at_least_one_new_valid_seq = seqanalyzer.validate_upload(c, args.revalidate) or at_least_one_new_valid_seq
+ new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
+ at_least_one_new_valid_seq = False
+ for c in new_collections:
+ at_least_one_new_valid_seq = seqanalyzer.validate_upload(c, args.revalidate) or at_least_one_new_valid_seq
- if at_least_one_new_valid_seq and not args.no_start_analysis:
- seqanalyzer.start_pangenome_analysis()
+ if at_least_one_new_valid_seq and not args.no_start_analysis:
+ seqanalyzer.start_pangenome_analysis()
- seqanalyzer.copy_most_recent_result()
+ seqanalyzer.copy_most_recent_result()
+ except Exception as e:
+ logging.exeception("Error in main loop")
if args.once:
break
diff --git a/scripts/cleanup.py b/scripts/cleanup.py
new file mode 100644
index 0000000..f4bd0b4
--- /dev/null
+++ b/scripts/cleanup.py
@@ -0,0 +1,20 @@
+import arvados
+import arvados.util
+
+api = arvados.api()
+
+patterns = [
+ "%missing%`collection_location`%",
+ "%missing%`technology`%",
+ "%missing%`host_species`%",
+ "%QC fail: alignment%",
+ "%does not look like a valid URI%",
+ ]
+
+for p in patterns:
+ c = arvados.util.list_all(api.collections().list, filters=[
+ ["owner_uuid", "=", "lugli-j7d0g-n5clictpuvwk8aa"],
+ ["properties.errors", "like", p]])
+ for i in c:
+ print("trashing %s %s" % (i["uuid"], i["properties"].get("sequence_label")))
+ api.collections().delete(uuid=i["uuid"]).execute()
--
cgit v1.2.3
From 15624e038e0f368d2be4c9a76ace77da4d673fdd Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 14:21:40 -0400
Subject: Improve upload form layout.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20simplewebuploader/main.py | 86 ++++++++++++++++-------------
bh20simplewebuploader/static/main.css | 17 ++++--
bh20simplewebuploader/templates/footer.html | 5 +-
3 files changed, 64 insertions(+), 44 deletions(-)
diff --git a/bh20simplewebuploader/main.py b/bh20simplewebuploader/main.py
index 3173d60..62b68d9 100644
--- a/bh20simplewebuploader/main.py
+++ b/bh20simplewebuploader/main.py
@@ -8,7 +8,7 @@ import os
import sys
import re
import string
-import yaml
+import ruamel.yaml as yaml
import pkg_resources
from flask import Flask, request, redirect, send_file, send_from_directory, render_template, jsonify
import os.path
@@ -16,6 +16,9 @@ import requests
import io
import arvados
from markupsafe import Markup
+from schema_salad.sourceline import add_lc_filename
+from schema_salad.schema import shortname
+from typing import MutableSequence, MutableMapping
ARVADOS_API = 'lugli.arvadosapi.com'
ANONYMOUS_TOKEN = '5o42qdxpxp5cj15jqjf7vnxx5xduhm4ret703suuoa3ivfglfh'
@@ -47,6 +50,8 @@ def type_to_heading(type_name):
Turn a type name like "sampleSchema" from the metadata schema into a human-readable heading.
"""
+ type_name = shortname(type_name)
+
print(type_name,file=sys.stderr)
# Remove camel case
decamel = re.sub('([A-Z])', r' \1', type_name)
@@ -78,7 +83,7 @@ def is_iri(string):
return string.startswith('http')
-def generate_form(schema, options):
+def generate_form(components, options):
"""
Linearize the schema into a list of dicts.
@@ -101,9 +106,6 @@ def generate_form(schema, options):
IRI.
"""
- # Get the list of form components, one of which is the root
- components = schema.get('$graph', [])
-
# Find the root
root_name = None
# And also index components by type name
@@ -131,14 +133,25 @@ def generate_form(schema, options):
# First make a heading, if we aren't the very root of the form
yield {'heading': type_to_heading(type_name)}
- for field_name, field_type in by_name.get(type_name, {}).get('fields', {}).items():
+ for field in by_name.get(type_name, {}).get('fields', []):
+ field_name = shortname(field["name"])
+ field_type = field["type"]
# For each field
ref_iri = None
docstring = None
- if not isinstance(field_type, str):
- # If the type isn't a string
+ optional = False
+ is_list = False
+
+ if isinstance(field_type, MutableSequence):
+ if field_type[0] == "null" and len(field_type) == 2:
+ optional = True
+ field_type = field_type[1]
+ else:
+ raise Exception("Can't handle it")
+
+ if isinstance(field_type, MutableMapping):
# It may have documentation
docstring = field_type.get('doc', None)
@@ -161,25 +174,13 @@ def generate_form(schema, options):
ref_iri = field_value
break
-
- # Now overwrite the field type with the actual type string
- field_type = field_type.get('type', '')
-
- # Decide if the field is optional (type ends in ?)
- optional = False
- if field_type.endswith('?'):
- # It's optional
- optional = True
- # Drop the ?
- field_type = field_type[:-1]
-
- # Decide if the field is a list (type ends in [])
- is_list = False
- if field_type.endswith('[]'):
- # It's a list
- is_list = True
- # Reduce to the normal type
- field_type = field_type[:-2]
+ if field_type["type"] == "array":
+ # Now replace the field type with the actual type string
+ is_list = True
+ field_type = field_type.get('items', '')
+ else:
+ field_type = field_type.get('type', '')
+ pass
if field_type in by_name:
# This is a subrecord. We need to recurse
@@ -227,15 +228,24 @@ def generate_form(schema, options):
return list(walk_fields(root_name))
-# At startup, we need to load the metadata schema from the uploader module, so we can make a form for it
-if os.path.isfile("bh20sequploader/bh20seq-schema.yml"):
- METADATA_SCHEMA = yaml.safe_load(open("bh20sequploader/bh20seq-schema.yml","r").read())
- METADATA_OPTION_DEFINITIONS = yaml.safe_load(open("bh20sequploader/bh20seq-options.yml","r").read())
-else:
- METADATA_SCHEMA = yaml.safe_load(pkg_resources.resource_stream("bh20sequploader", "bh20seq-schema.yml"))
- METADATA_OPTION_DEFINITIONS = yaml.safe_load(pkg_resources.resource_stream("bh20sequploader", "bh20seq-options.yml"))
-# print(METADATA_SCHEMA,file=sys.stderr)
-FORM_ITEMS = generate_form(METADATA_SCHEMA, METADATA_OPTION_DEFINITIONS)
+import schema_salad.schema
+def load_schema_generate_form():
+ # At startup, we need to load the metadata schema from the uploader module, so we can make a form for it
+ if os.path.isfile("bh20sequploader/bh20seq-schema.yml"):
+ METADATA_SCHEMA = yaml.round_trip_load(open("bh20sequploader/bh20seq-schema.yml","r").read())
+ METADATA_OPTION_DEFINITIONS = yaml.safe_load(open("bh20sequploader/bh20seq-options.yml","r").read())
+ else:
+ METADATA_SCHEMA = yaml.round_trip_load(pkg_resources.resource_stream("bh20sequploader", "bh20seq-schema.yml"))
+ METADATA_OPTION_DEFINITIONS = yaml.safe_load(pkg_resources.resource_stream("bh20sequploader", "bh20seq-options.yml"))
+
+ METADATA_SCHEMA["name"] = "bh20seq-schema.yml"
+ add_lc_filename(METADATA_SCHEMA, "bh20seq-schema.yml")
+ metaschema_names, _metaschema_doc, metaschema_loader = schema_salad.schema.get_metaschema()
+ schema_doc, schema_metadata = metaschema_loader.resolve_ref(METADATA_SCHEMA, "")
+
+ return generate_form(schema_doc, METADATA_OPTION_DEFINITIONS)
+
+FORM_ITEMS = load_schema_generate_form()
@app.route('/')
def send_home():
@@ -543,10 +553,10 @@ def status_page():
for s in (("passed", "/download"), ("pending", "#pending"), ("rejected", "#rejected")):
output.write("%s sequences QC %s
" % (s[1], status.get(s[0], 0), s[0]))
- output.write("Pending
")
+ output.write("Pending
")
pending_table(output, out)
- output.write("Rejected
")
+ output.write("Rejected
")
rejected_table(output, out)
return render_template('status.html', table=Markup(output.getvalue()), menu='STATUS')
diff --git a/bh20simplewebuploader/static/main.css b/bh20simplewebuploader/static/main.css
index 47fb408..b28ee9c 100644
--- a/bh20simplewebuploader/static/main.css
+++ b/bh20simplewebuploader/static/main.css
@@ -178,7 +178,7 @@ span.dropt:hover {text-decoration: none; background: #ffffff; z-index: 6; }
.about {
display: grid;
- grid-template-columns: repeat(2, 1fr);
+ grid-template-columns: 1fr 1fr;
grid-auto-flow: row;
}
@@ -229,7 +229,7 @@ a {
#metadata_fill_form {
column-count: 4;
margin-top: 0.5em;
- column-width: 250px;
+ column-width: 15em;
}
.record, .record .field-group, .record .field-group .field {
@@ -238,6 +238,8 @@ a {
-webkit-column-break-inside: avoid; /* Chrome, Safari, Opera */
page-break-inside: avoid; /* Firefox */
break-inside: avoid;
+ display: block;
+ width: 90%;
}
.record {
@@ -258,6 +260,10 @@ a {
width: max-content;
}
+.control {
+ width: 100%;
+}
+
.filter-options {
width: 100%;
}
@@ -304,9 +310,10 @@ footer {
}
.sponsors img {
- width: 80%;
- display:block;
- margin:auto;
+ width: auto;
+ display: block;
+ margin: auto;
+ height: 4em;
}
.loader {
diff --git a/bh20simplewebuploader/templates/footer.html b/bh20simplewebuploader/templates/footer.html
index 37a6b64..5a1f3c9 100644
--- a/bh20simplewebuploader/templates/footer.html
+++ b/bh20simplewebuploader/templates/footer.html
@@ -21,11 +21,14 @@
+
--
cgit v1.2.3
From 474d15e17be63046a091615e89ba63adecdb109b Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 14:28:02 -0400
Subject: Cleanup script also clears errors for revalidate
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
scripts/cleanup.py | 22 ++++++++++++++++++++--
1 file changed, 20 insertions(+), 2 deletions(-)
diff --git a/scripts/cleanup.py b/scripts/cleanup.py
index f4bd0b4..6a82659 100644
--- a/scripts/cleanup.py
+++ b/scripts/cleanup.py
@@ -3,18 +3,36 @@ import arvados.util
api = arvados.api()
-patterns = [
+delete_patterns = [
"%missing%`collection_location`%",
"%missing%`technology`%",
"%missing%`host_species`%",
"%QC fail: alignment%",
"%does not look like a valid URI%",
+ "%Duplicate of%"
]
-for p in patterns:
+revalidate_patterns = [
+ "%missing%`license`%"
+]
+
+for p in delete_patterns:
c = arvados.util.list_all(api.collections().list, filters=[
["owner_uuid", "=", "lugli-j7d0g-n5clictpuvwk8aa"],
["properties.errors", "like", p]])
for i in c:
print("trashing %s %s" % (i["uuid"], i["properties"].get("sequence_label")))
api.collections().delete(uuid=i["uuid"]).execute()
+
+for p in revalidate_patterns:
+ c = arvados.util.list_all(api.collections().list, filters=[
+ ["owner_uuid", "=", "lugli-j7d0g-n5clictpuvwk8aa"],
+ ["properties.errors", "like", p]])
+ for i in c:
+ print("clearing status %s %s" % (i["uuid"], i["properties"].get("sequence_label")))
+ pr = i["properties"]
+ if "status" in pr:
+ del pr["status"]
+ if "errors" in pr:
+ del pr["errors"]
+ api.collections().update(uuid=i["uuid"], body={"properties": pr}).execute()
--
cgit v1.2.3
From d34374f0e822edd1539ea5de6f8522f2b761de3f Mon Sep 17 00:00:00 2001
From: Peter Amstutz
Date: Thu, 16 Jul 2020 14:48:22 -0400
Subject: Improve uploader reporting.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz
---
bh20sequploader/main.py | 22 ++++++++++------------
bh20simplewebuploader/main.py | 4 ++--
bh20simplewebuploader/templates/error.html | 2 +-
bh20simplewebuploader/templates/success.html | 2 +-
4 files changed, 14 insertions(+), 16 deletions(-)
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index f744a8c..6049bf9 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -29,11 +29,10 @@ def qc_stuff(metadata, sequence_p1, sequence_p2, do_qc=True):
try:
log.debug("Checking metadata" if do_qc else "Skipping metadata check")
if do_qc and not qc_metadata(metadata.name):
- log.warning("Failed metadata qc")
+ log.warning("Failed metadata QC")
failed = True
except Exception as e:
- log.debug(e)
- print(e)
+ log.exception("Failed metadata QC")
failed = True
target = []
@@ -45,8 +44,7 @@ def qc_stuff(metadata, sequence_p1, sequence_p2, do_qc=True):
target[0] = ("reads_1."+target[0][0][6:], target[0][1])
target[1] = ("reads_2."+target[1][0][6:], target[0][1])
except Exception as e:
- log.debug(e)
- print(e)
+ log.exception("Failed sequence QC")
failed = True
if failed:
@@ -82,7 +80,7 @@ def main():
seqlabel = target[0][1]
if args.validate:
- print("Valid")
+ log.info("Valid")
exit(0)
col = arvados.collection.Collection(api_client=api)
@@ -91,10 +89,10 @@ def main():
if args.sequence_p2:
upload_sequence(col, target[1], args.sequence_p2)
- print("Reading metadata")
+ log.info("Reading metadata")
with col.open("metadata.yaml", "w") as f:
r = args.metadata.read(65536)
- print(r[0:20])
+ log.info(r[0:20])
while r:
f.write(r)
r = args.metadata.read(65536)
@@ -118,7 +116,7 @@ def main():
["portable_data_hash", "=", col.portable_data_hash()]]).execute()
if dup["items"]:
# This exact collection has been uploaded before.
- print("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
+ log.error("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
exit(1)
if args.trusted:
@@ -131,9 +129,9 @@ def main():
(seqlabel, properties['upload_user'], properties['upload_ip']),
properties=properties, ensure_unique_name=True)
- print("Saved to %s" % col.manifest_locator())
-
- print("Done")
+ log.info("Saved to %s" % col.manifest_locator())
+ log.info("Done")
+ exit(0)
if __name__ == "__main__":
main()
diff --git a/bh20simplewebuploader/main.py b/bh20simplewebuploader/main.py
index 62b68d9..c814f30 100644
--- a/bh20simplewebuploader/main.py
+++ b/bh20simplewebuploader/main.py
@@ -445,12 +445,12 @@ def receive_files():
if result.returncode != 0:
# It didn't work. Complain.
- error_message="Uploader returned value {} and said:".format(result.returncode) + str(result.stderr.decode('utf-8'))
+ error_message="Uploader returned value {} and said:\n".format(result.returncode) + str(result.stderr.decode('utf-8'))
print(error_message, file=sys.stderr)
return (render_template('error.html', error_message=error_message), 403)
else:
# It worked. Say so.
- return render_template('success.html', log=result.stdout.decode('utf-8', errors='replace'))
+ return render_template('success.html', log=result.stderr.decode('utf-8', errors='replace'))
finally:
shutil.rmtree(dest_dir)
diff --git a/bh20simplewebuploader/templates/error.html b/bh20simplewebuploader/templates/error.html
index b1d9402..fc08aed 100644
--- a/bh20simplewebuploader/templates/error.html
+++ b/bh20simplewebuploader/templates/error.html
@@ -15,7 +15,7 @@
- Click here to try again.
+ Click here to try again.