aboutsummaryrefslogtreecommitdiff
path: root/bh20seqanalyzer/main.py
diff options
context:
space:
mode:
authorPjotr Prins2020-07-17 11:08:15 +0100
committerPjotr Prins2020-07-17 11:08:15 +0100
commit16bb5df907c79cd0ce6bea0015821a2ce51fb992 (patch)
treeddb9677cddcc463bb514300189cbd4300b9117ed /bh20seqanalyzer/main.py
parent0be9983ef88fd3b925d8fa53e7f9ab2a28703bc0 (diff)
parentc69046ee9a5e24eadcd8cb885633328b0fd88011 (diff)
downloadbh20-seq-resource-16bb5df907c79cd0ce6bea0015821a2ce51fb992.tar.gz
bh20-seq-resource-16bb5df907c79cd0ce6bea0015821a2ce51fb992.tar.lz
bh20-seq-resource-16bb5df907c79cd0ce6bea0015821a2ce51fb992.zip
Merge branch 'master' into ebi-submit
Diffstat (limited to 'bh20seqanalyzer/main.py')
-rw-r--r--bh20seqanalyzer/main.py618
1 files changed, 322 insertions, 296 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 0b52e6b..b3a439d 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -16,277 +16,308 @@ logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%
level=logging.INFO)
logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN)
-def validate_upload(api, collection, validated_project,
- fastq_project, fastq_workflow_uuid,
- revalidate):
- col = arvados.collection.Collection(collection["uuid"])
-
- if not revalidate and collection["properties"].get("status") in ("validated", "rejected"):
- return False
-
- # validate the collection here. Check metadata, etc.
- logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
-
- errors = []
-
- if collection["owner_uuid"] != validated_project:
- dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
- ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
- if dup["items"]:
- # This exact collection has been uploaded before.
- errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
-
- if not errors:
- if "metadata.yaml" not in col:
- errors.append("Missing metadata.yaml", collection["name"])
- else:
+class SeqAnalyzer:
+
+ def __init__(self, api, keepclient,
+ uploader_project,
+ pangenome_analysis_project,
+ fastq_project,
+ validated_project,
+ workflow_def_project,
+ pangenome_workflow_uuid,
+ fastq_workflow_uuid,
+ exclude_list,
+ latest_result_collection):
+ self.api = api
+ self.keepclient = keepclient
+ self.uploader_project = uploader_project
+ self.pangenome_analysis_project = pangenome_analysis_project
+ self.fastq_project = fastq_project
+ self.validated_project = validated_project
+ self.workflow_def_project = workflow_def_project
+ self.pangenome_workflow_uuid = pangenome_workflow_uuid
+ self.fastq_workflow_uuid = fastq_workflow_uuid
+ self.exclude_list = exclude_list
+ self.latest_result_uuid = latest_result_collection
+ self.schema_ref = None
+
+ def validate_upload(self, collection, revalidate):
+ col = arvados.collection.Collection(collection["uuid"], api_client=self.api, keep_client=self.keepclient)
+
+ if not revalidate and collection["properties"].get("status") in ("validated", "rejected"):
+ return False
+
+ # validate the collection here. Check metadata, etc.
+ logging.info("Validating upload '%s' (%s)" % (collection["name"], collection["uuid"]))
+
+ errors = []
+
+ if collection["owner_uuid"] != self.validated_project:
+ dup = self.api.collections().list(filters=[["owner_uuid", "=", self.validated_project],
+ ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+ if dup["items"]:
+ # This exact collection has been uploaded before.
+ errors.append("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
+
+ if not errors:
+ if "metadata.yaml" not in col:
+ errors.append("Missing metadata.yaml", collection["name"])
+ else:
+ try:
+ with col.open("metadata.yaml") as md:
+ metadata_content = ruamel.yaml.round_trip_load(md)
+ metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ sample_id = metadata_content["sample"]["sample_id"]
+ add_lc_filename(metadata_content, metadata_content["id"])
+ valid = qc_metadata(metadata_content)
+ if not valid:
+ errors.append("Failed metadata qc")
+ except Exception as e:
+ errors.append(str(e))
+
+ if not errors:
try:
- metadata_content = ruamel.yaml.round_trip_load(col.open("metadata.yaml"))
- metadata_content["id"] = "http://arvados.org/keep:%s/metadata.yaml" % collection["portable_data_hash"]
- sample_id = metadata_content["sample"]["sample_id"]
- add_lc_filename(metadata_content, metadata_content["id"])
- valid = qc_metadata(metadata_content)
- if not valid:
- errors.append("Failed metadata qc")
- except Exception as e:
- errors.append(str(e))
-
- if not errors:
- try:
- tgt = None
- paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
- for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- if n not in col:
- continue
- with col.open(n, 'rb') as qf:
- tgt = qc_fasta(qf)[0]
- if tgt != n and tgt != paired.get(n):
- errors.append("Expected %s but magic says it should be %s", n, tgt)
- elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
- start_fastq_to_fasta(api, collection, fastq_project, fastq_workflow_uuid, n, sample_id)
- return False
- if tgt is None:
- errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
- except Exception as v:
- errors.append(str(v))
-
-
- if not errors:
- # Move it to the "validated" project to be included in the next analysis
- if "errors" in collection["properties"]:
- del collection["properties"]["errors"]
- collection["properties"]["status"] = "validated"
- api.collections().update(uuid=collection["uuid"], body={
- "owner_uuid": validated_project,
- "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
- "properties": collection["properties"]}).execute()
- logging.info("Added '%s' to validated sequences" % collection["name"])
- return True
- else:
- # It is invalid
- logging.warn("'%s' (%s) has validation errors: %s" % (
- collection["name"], collection["uuid"], "\n".join(errors)))
- collection["properties"]["status"] = "rejected"
- collection["properties"]["errors"] = errors
- api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
- return False
-
-
-def run_workflow(api, parent_project, workflow_uuid, name, inputobj):
- project = api.groups().create(body={
- "group_class": "project",
- "name": name,
- "owner_uuid": parent_project,
- }, ensure_unique_name=True).execute()
-
- with tempfile.NamedTemporaryFile() as tmp:
- tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
- tmp.flush()
- cmd = ["arvados-cwl-runner",
- "--submit",
- "--no-wait",
- "--project-uuid=%s" % project["uuid"],
- "arvwf:%s" % workflow_uuid,
- tmp.name]
- logging.info("Running %s" % ' '.join(cmd))
- comp = subprocess.run(cmd, capture_output=True)
- logging.info("Submitted %s", comp.stdout)
- if comp.returncode != 0:
- logging.error(comp.stderr.decode('utf-8'))
-
- return project
-
-
-def start_fastq_to_fasta(api, collection,
- analysis_project,
- fastq_workflow_uuid,
- tgt,
- sample_id):
-
- params = {
- "metadata": {
- "class": "File",
- "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
- },
- "ref_fasta": {
- "class": "File",
- "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
- },
- "sample_id": sample_id
- }
-
- if tgt.startswith("reads.fastq"):
- params["fastq_forward"] = {
- "class": "File",
- "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
- }
- elif tgt.startswith("reads_1.fastq"):
- params["fastq_forward"] = {
- "class": "File",
- "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
- }
- params["fastq_reverse"] = {
- "class": "File",
- "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ tgt = None
+ paired = {"reads_1.fastq": "reads.fastq", "reads_1.fastq.gz": "reads.fastq.gz"}
+ for n in ("sequence.fasta", "reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ if n not in col:
+ continue
+ with col.open(n, 'rb') as qf:
+ tgt = qc_fasta(qf)[0]
+ if tgt != n and tgt != paired.get(n):
+ errors.append("Expected %s but magic says it should be %s", n, tgt)
+ elif tgt in ("reads.fastq", "reads.fastq.gz", "reads_1.fastq", "reads_1.fastq.gz"):
+ self.start_fastq_to_fasta(collection, n, sample_id)
+ return False
+ if tgt is None:
+ errors.append("Upload '%s' does not contain sequence.fasta, reads.fastq or reads_1.fastq", collection["name"])
+ except Exception as v:
+ errors.append(str(v))
+
+
+ if not errors:
+ # Move it to the "validated" project to be included in the next analysis
+ if "errors" in collection["properties"]:
+ del collection["properties"]["errors"]
+ collection["properties"]["status"] = "validated"
+ self.api.collections().update(uuid=collection["uuid"], body={
+ "owner_uuid": self.validated_project,
+ "name": "%s (%s)" % (collection["name"], time.asctime(time.gmtime())),
+ "properties": collection["properties"]}).execute()
+ logging.info("Added '%s' to validated sequences" % collection["name"])
+ return True
+ else:
+ # It is invalid
+ logging.warn("'%s' (%s) has validation errors: %s" % (
+ collection["name"], collection["uuid"], "\n".join(errors)))
+ collection["properties"]["status"] = "rejected"
+ collection["properties"]["errors"] = errors
+ self.api.collections().update(uuid=collection["uuid"], body={"properties": collection["properties"]}).execute()
+ return False
+
+
+ def run_workflow(self, parent_project, workflow_uuid, name, inputobj):
+ project = self.api.groups().create(body={
+ "group_class": "project",
+ "name": name,
+ "owner_uuid": parent_project,
+ }, ensure_unique_name=True).execute()
+
+ with tempfile.NamedTemporaryFile() as tmp:
+ tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
+ tmp.flush()
+ cmd = ["arvados-cwl-runner",
+ "--submit",
+ "--no-wait",
+ "--project-uuid=%s" % project["uuid"],
+ "arvwf:%s" % workflow_uuid,
+ tmp.name]
+ logging.info("Running %s" % ' '.join(cmd))
+ comp = subprocess.run(cmd, capture_output=True)
+ logging.info("Submitted %s", comp.stdout)
+ if comp.returncode != 0:
+ logging.error(comp.stderr.decode('utf-8'))
+
+ return project
+
+
+ def start_fastq_to_fasta(self, collection,
+ tgt,
+ sample_id):
+
+ params = {
+ "metadata": {
+ "class": "File",
+ "location": "keep:%s/metadata.yaml" % collection["portable_data_hash"]
+ },
+ "ref_fasta": {
+ "class": "File",
+ "location": "keep:ffef6a3b77e5e04f8f62a7b6f67264d1+556/SARS-CoV2-NC_045512.2.fasta"
+ },
+ "sample_id": sample_id
}
- newproject = run_workflow(api, analysis_project, fastq_workflow_uuid, "FASTQ to FASTA", params)
- api.collections().update(uuid=collection["uuid"],
- body={"owner_uuid": newproject["uuid"]}).execute()
-
-def start_pangenome_analysis(api,
- analysis_project,
- pangenome_workflow_uuid,
- validated_project,
- schema_ref,
- exclude_list):
- validated = arvados.util.list_all(api.collections().list, filters=[
- ["owner_uuid", "=", validated_project],
- ["properties.status", "=", "validated"]])
- inputobj = {
- "inputReads": [],
- "metadata": [],
- "subjects": [],
- "metadataSchema": {
- "class": "File",
- "location": schema_ref
- },
- "exclude": {
- "class": "File",
- "location": exclude_list
+ if tgt.startswith("reads.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/%s" % (collection["portable_data_hash"], tgt)
+ }
+ elif tgt.startswith("reads_1.fastq"):
+ params["fastq_forward"] = {
+ "class": "File",
+ "location": "keep:%s/reads_1.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+ params["fastq_reverse"] = {
+ "class": "File",
+ "location": "keep:%s/reads_2.%s" % (collection["portable_data_hash"], tgt[8:])
+ }
+
+ newproject = self.run_workflow(self.fastq_project, self.fastq_workflow_uuid, "FASTQ to FASTA", params)
+ self.api.collections().update(uuid=collection["uuid"],
+ body={"owner_uuid": newproject["uuid"]}).execute()
+
+ def start_pangenome_analysis(self):
+
+ if self.schema_ref is None:
+ self.upload_schema()
+
+ validated = arvados.util.list_all(self.api.collections().list, filters=[
+ ["owner_uuid", "=", self.validated_project],
+ ["properties.status", "=", "validated"]])
+ inputobj = {
+ "inputReads": [],
+ "metadata": [],
+ "subjects": [],
+ "metadataSchema": {
+ "class": "File",
+ "location": self.schema_ref
+ },
+ "exclude": {
+ "class": "File",
+ "location": self.exclude_list
+ }
}
- }
- validated.sort(key=lambda v: v["portable_data_hash"])
- for v in validated:
- inputobj["inputReads"].append({
- "class": "File",
- "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
- })
- inputobj["metadata"].append({
- "class": "File",
- "location": "keep:%s/metadata.yaml" % v["portable_data_hash"]
- })
- inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"])
- run_workflow(api, analysis_project, pangenome_workflow_uuid, "Pangenome analysis", inputobj)
-
-
-def get_workflow_output_from_project(api, uuid):
- cr = api.container_requests().list(filters=[['owner_uuid', '=', uuid],
- ["requesting_container_uuid", "=", None]]).execute()
- if cr["items"] and cr["items"][0]["output_uuid"]:
- container = api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute()
- if container["state"] == "Complete" and container["exit_code"] == 0:
- return cr["items"][0]
- return None
-
-
-def copy_most_recent_result(api, analysis_project, latest_result_uuid):
- most_recent_analysis = api.groups().list(filters=[['owner_uuid', '=', analysis_project]],
- order="created_at desc").execute()
- for m in most_recent_analysis["items"]:
- wf = get_workflow_output_from_project(api, m["uuid"])
- if wf:
- src = api.collections().get(uuid=wf["output_uuid"]).execute()
- dst = api.collections().get(uuid=latest_result_uuid).execute()
- if src["portable_data_hash"] != dst["portable_data_hash"]:
- logging.info("Copying latest result from '%s' to %s", m["name"], latest_result_uuid)
- api.collections().update(uuid=latest_result_uuid,
- body={"manifest_text": src["manifest_text"],
- "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute()
- break
-
+ validated.sort(key=lambda v: v["portable_data_hash"])
+ for v in validated:
+ inputobj["inputReads"].append({
+ "class": "File",
+ "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
+ })
+ inputobj["metadata"].append({
+ "class": "File",
+ "location": "keep:%s/metadata.yaml" % v["portable_data_hash"]
+ })
+ inputobj["subjects"].append("http://collections.lugli.arvadosapi.com/c=%s/sequence.fasta" % v["portable_data_hash"])
+ self.run_workflow(self.pangenome_analysis_project, self.pangenome_workflow_uuid, "Pangenome analysis", inputobj)
+
+
+ def get_workflow_output_from_project(self, uuid):
+ cr = self.api.container_requests().list(filters=[['owner_uuid', '=', uuid],
+ ["requesting_container_uuid", "=", None]]).execute()
+ if cr["items"] and cr["items"][0]["output_uuid"]:
+ container = self.api.containers().get(uuid=cr["items"][0]["container_uuid"]).execute()
+ if container["state"] == "Complete" and container["exit_code"] == 0:
+ return cr["items"][0]
+ return None
+
+
+ def copy_most_recent_result(self):
+ most_recent_analysis = self.api.groups().list(filters=[['owner_uuid', '=', self.pangenome_analysis_project]],
+ order="created_at desc").execute()
+ for m in most_recent_analysis["items"]:
+ wf = self.get_workflow_output_from_project(m["uuid"])
+ if wf:
+ src = self.api.collections().get(uuid=wf["output_uuid"]).execute()
+ dst = self.api.collections().get(uuid=self.latest_result_uuid).execute()
+ if src["portable_data_hash"] != dst["portable_data_hash"]:
+ logging.info("Copying latest result from '%s' to %s", m["name"], self.latest_result_uuid)
+ self.api.collections().update(uuid=self.latest_result_uuid,
+ body={"manifest_text": src["manifest_text"],
+ "description": "Result from %s %s" % (m["name"], wf["uuid"])}).execute()
+ break
+
+
+ def move_fastq_to_fasta_results(self):
+ projects = self.api.groups().list(filters=[['owner_uuid', '=', self.fastq_project],
+ ["properties.moved_output", "!=", True]],
+ order="created_at asc",).execute()
+ for p in projects["items"]:
+ wf = self.get_workflow_output_from_project(p["uuid"])
+ if not wf:
+ continue
-def move_fastq_to_fasta_results(api, analysis_project, uploader_project):
- projects = api.groups().list(filters=[['owner_uuid', '=', analysis_project],
- ["properties.moved_output", "!=", True]],
- order="created_at desc",).execute()
- for p in projects["items"]:
- wf = get_workflow_output_from_project(api, p["uuid"])
- if wf:
logging.info("Moving completed fastq2fasta result %s back to uploader project", wf["output_uuid"])
- api.collections().update(uuid=wf["output_uuid"],
- body={"owner_uuid": uploader_project}).execute()
- p["properties"]["moved_output"] = True
- api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute()
- break
+ col = arvados.collection.Collection(wf["output_uuid"], api_client=self.api, keep_client=self.keepclient)
+ with col.open("metadata.yaml") as md:
+ metadata_content = ruamel.yaml.round_trip_load(md)
+
+ colprop = col.get_properties()
+ colprop["sequence_label"] = metadata_content["sample"]["sample_id"]
+ self.api.collections().update(uuid=wf["output_uuid"],
+ body={"owner_uuid": self.uploader_project,
+ "properties": colprop}).execute()
-def upload_schema(api, workflow_def_project):
- schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml")
- c = arvados.collection.Collection()
- with c.open("schema.yml", "wb") as f:
- f.write(schema_resource.read())
- pdh = c.portable_data_hash()
- wd = api.collections().list(filters=[["owner_uuid", "=", workflow_def_project],
- ["portable_data_hash", "=", pdh]]).execute()
- if len(wd["items"]) == 0:
- c.save_new(owner_uuid=workflow_def_project, name="Metadata schema", ensure_unique_name=True)
- return "keep:%s/schema.yml" % pdh
-
-
-def print_status(api, uploader_project, fmt):
- pending = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", uploader_project]])
- out = []
- status = {}
- for p in pending:
- prop = p["properties"]
- out.append(prop)
- if "status" not in prop:
- prop["status"] = "pending"
- prop["created_at"] = p["created_at"]
- prop["uuid"] = p["uuid"]
- status[prop["status"]] = status.get(prop["status"], 0) + 1
- if fmt == "html":
- print(
-"""
-<html>
-<body>
-""")
- print("<p>Total collections in upload project %s</p>" % len(out))
- print("<p>Status %s</p>" % status)
- print(
-"""
-<table>
-<tr><th>Collection</th>
-<th>Sequence label</th>
-<th>Status</th>
-<th>Errors</th></tr>
-""")
- for r in out:
- print("<tr valign='top'>")
- print("<td><a href='https://workbench.lugli.arvadosapi.com/collections/%s'>%s</a></td>" % (r["uuid"], r["uuid"]))
- print("<td>%s</td>" % r["sequence_label"])
- print("<td>%s</td>" % r["status"])
- print("<td><pre>%s</pre></td>" % "\n".join(r.get("errors", [])))
- print("</tr>")
- print(
-"""
-</table>
-</body>
-</html>
-""")
- else:
- print(json.dumps(out, indent=2))
+ p["properties"]["moved_output"] = True
+ self.api.groups().update(uuid=p["uuid"], body={"properties": p["properties"]}).execute()
+
+
+ def upload_schema(self):
+ schema_resource = pkg_resources.resource_stream('bh20sequploader.qc_metadata', "bh20seq-schema.yml")
+ c = arvados.collection.Collection(api_client=self.api, keep_client=self.keepclient)
+ with c.open("schema.yml", "wb") as f:
+ f.write(schema_resource.read())
+ pdh = c.portable_data_hash()
+ wd = self.api.collections().list(filters=[["owner_uuid", "=", self.workflow_def_project],
+ ["portable_data_hash", "=", pdh]]).execute()
+ if len(wd["items"]) == 0:
+ c.save_new(owner_uuid=self.workflow_def_project, name="Metadata schema", ensure_unique_name=True)
+ self.schema_ref = "keep:%s/schema.yml" % pdh
+
+
+ def print_status(self, fmt):
+ pending = arvados.util.list_all(self.api.collections().list, filters=[["owner_uuid", "=", self.uploader_project]])
+ out = []
+ status = {}
+ for p in pending:
+ prop = p["properties"]
+ out.append(prop)
+ if "status" not in prop:
+ prop["status"] = "pending"
+ prop["created_at"] = p["created_at"]
+ prop["uuid"] = p["uuid"]
+ status[prop["status"]] = status.get(prop["status"], 0) + 1
+ if fmt == "html":
+ print(
+ """
+ <html>
+ <body>
+ """)
+ print("<p>Total collections in upload project %s</p>" % len(out))
+ print("<p>Status %s</p>" % status)
+ print(
+ """
+ <table>
+ <tr><th>Collection</th>
+ <th>Sequence label</th>
+ <th>Status</th>
+ <th>Errors</th></tr>
+ """)
+ for r in out:
+ print("<tr valign='top'>")
+ print("<td><a href='https://workbench.lugli.arvadosapi.com/collections/%s'>%s</a></td>" % (r["uuid"], r["uuid"]))
+ print("<td>%s</td>" % r["sequence_label"])
+ print("<td>%s</td>" % r["status"])
+ print("<td><pre>%s</pre></td>" % "\n".join(r.get("errors", [])))
+ print("</tr>")
+ print(
+ """
+ </table>
+ </body>
+ </html>
+ """)
+ else:
+ print(json.dumps(out, indent=2))
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
@@ -310,50 +341,45 @@ def main():
args = parser.parse_args()
api = arvados.api()
-
-
-
- schema_ref = upload_schema(api, args.workflow_def_project)
+ keepclient = arvados.keep.KeepClient(api_client=api)
+
+ seqanalyzer = SeqAnalyzer(api, keepclient,
+ args.uploader_project,
+ args.pangenome_analysis_project,
+ args.fastq_project,
+ args.validated_project,
+ args.workflow_def_project,
+ args.pangenome_workflow_uuid,
+ args.fastq_workflow_uuid,
+ args.exclude_list,
+ args.latest_result_collection)
if args.kickoff:
logging.info("Starting a single analysis run")
- start_pangenome_analysis(api,
- args.pangenome_analysis_project,
- args.pangenome_workflow_uuid,
- args.validated_project,
- schema_ref,
- args.exclude_list)
+ seqanalyzer.start_pangenome_analysis()
return
if args.print_status:
- print_status(api, args.uploader_project, args.print_status)
+ seqanalyzer.print_status(args.print_status)
exit(0)
logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
while True:
- move_fastq_to_fasta_results(api, args.fastq_project, args.uploader_project)
-
- new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
- at_least_one_new_valid_seq = False
- for c in new_collections:
- at_least_one_new_valid_seq = validate_upload(api, c,
- args.validated_project,
- args.fastq_project,
- args.fastq_workflow_uuid,
- args.revalidate) or at_least_one_new_valid_seq
-
- if at_least_one_new_valid_seq and not args.no_start_analysis:
- start_pangenome_analysis(api,
- args.pangenome_analysis_project,
- args.pangenome_workflow_uuid,
- args.validated_project,
- schema_ref,
- args.exclude_list)
-
- copy_most_recent_result(api,
- args.pangenome_analysis_project,
- args.latest_result_collection)
+ try:
+ seqanalyzer.move_fastq_to_fasta_results()
+
+ new_collections = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", args.uploader_project]])
+ at_least_one_new_valid_seq = False
+ for c in new_collections:
+ at_least_one_new_valid_seq = seqanalyzer.validate_upload(c, args.revalidate) or at_least_one_new_valid_seq
+
+ if at_least_one_new_valid_seq and not args.no_start_analysis:
+ seqanalyzer.start_pangenome_analysis()
+
+ seqanalyzer.copy_most_recent_result()
+ except Exception as e:
+ logging.exeception("Error in main loop")
if args.once:
break