about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--bh20seqanalyzer/main.py83
-rw-r--r--bh20sequploader/main.py3
-rw-r--r--setup.py2
3 files changed, 69 insertions, 19 deletions
diff --git a/bh20seqanalyzer/main.py b/bh20seqanalyzer/main.py
index 23e58e9..dae8eca 100644
--- a/bh20seqanalyzer/main.py
+++ b/bh20seqanalyzer/main.py
@@ -1,29 +1,70 @@
 import argparse
 import arvados
+import arvados.collection
 import time
 import subprocess
 import tempfile
 import json
+import logging
+
+logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
+                    level=logging.INFO)
+logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN)
+
+def validate_upload(api, collection, validated_project):
+    col = arvados.collection.Collection(collection["uuid"])
+
+    # validate the collection here.  Check metadata, etc.
+    valid = True
+
+    if "sequence.fasta" not in col:
+        valid = False
+        logging.warn("Upload '%s' missing sequence.fasta", collection["name"])
+    if "metadata.jsonld" not in col:
+        logging.warn("Upload '%s' missing metadata.jsonld", collection["name"])
+        valid = False
+
+    dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
+                                          ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
+    if dup["items"]:
+        # This exact collection has been uploaded before.
+        valid = False
+        logging.warn("Upload '%s' is duplicate" % collection["name"])
+
+    if valid:
+        logging.info("Added '%s' to validated sequences" % collection["name"])
+        # Move it to the "validated" project to be included in the next analysis
+        api.collections().update(uuid=collection["uuid"], body={"owner_uuid": validated_project}).execute()
+    else:
+        # It is invalid, delete it.
+        logging.warn("Deleting '%s'" % collection["name"])
+        api.collections().delete(uuid=collection["uuid"]).execute()
+
+    return valid
+
+def start_analysis(api,
+                   analysis_project,
+                   workflow_uuid,
+                   validated_project):
 
-def start_analysis(api, collection, analysis_project, workflow_uuid):
     project = api.groups().create(body={
         "group_class": "project",
-        "name": "Analysis of %s" % collection["name"],
+        "name": "Pangenome analysis",
         "owner_uuid": analysis_project,
     }, ensure_unique_name=True).execute()
 
+    validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]])
+
     with tempfile.NamedTemporaryFile() as tmp:
-        inputobj = json.dumps({
-            "sequence": {
+        inputobj = {
+            "inputReads": []
+        }
+        for v in validated:
+            inputobj["inputReads"].append({
                 "class": "File",
-                "location": "keep:%s/sequence.fasta" % collection["portable_data_hash"]
-            },
-            "metadata": {
-                "class": "File",
-                "location": "keep:%s/metadata.jsonld" % collection["portable_data_hash"]
-            }
-        }, indent=2)
-        tmp.write(inputobj.encode('utf-8'))
+                "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
+            })
+        tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
         tmp.flush()
         cmd = ["arvados-cwl-runner",
                "--submit",
@@ -32,24 +73,32 @@ def start_analysis(api, collection, analysis_project, workflow_uuid):
                "--project-uuid=%s" % project["uuid"],
                "arvwf:%s" % workflow_uuid,
                tmp.name]
-        print("Running %s" % ' '.join(cmd))
+        logging.info("Running %s" % ' '.join(cmd))
         comp = subprocess.run(cmd, capture_output=True)
     if comp.returncode != 0:
-        print(comp.stderr.decode('utf-8'))
-    else:
-        api.collections().update(uuid=collection["uuid"], body={"owner_uuid": project['uuid']}).execute()
+        logging.error(comp.stderr.decode('utf-8'))
+
 
 def main():
     parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
     parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
     parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='')
+    parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='')
     parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='')
     args = parser.parse_args()
 
     api = arvados.api()
 
+    logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))
+
     while True:
         new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute()
+        at_least_one_new_valid_seq = False
         for c in new_collections["items"]:
-            start_analysis(api, c, args.analysis_project, args.workflow_uuid)
+            at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project) or at_least_one_new_valid_seq
+
+        if at_least_one_new_valid_seq:
+            start_analysis(api, args.analysis_project,
+                           args.workflow_uuid,
+                           args.validated_project)
         time.sleep(10)
diff --git a/bh20sequploader/main.py b/bh20sequploader/main.py
index 17ad492..d3ebc0c 100644
--- a/bh20sequploader/main.py
+++ b/bh20sequploader/main.py
@@ -49,4 +49,5 @@ def main():
                  (properties['upload_user'], properties['upload_ip']),
                  properties=properties, ensure_unique_name=True)
 
-main()
+if __name__ == "__main__":
+    main()
diff --git a/setup.py b/setup.py
index 9e73ff0..0685d37 100644
--- a/setup.py
+++ b/setup.py
@@ -6,7 +6,7 @@ import setuptools.command.egg_info as egg_info_cmd
 from setuptools import setup
 
 SETUP_DIR = os.path.dirname(__file__)
-README = os.path.join(SETUP_DIR, "README.rst")
+README = os.path.join(SETUP_DIR, "README.md")
 
 try:
     import gittaggers