aboutsummaryrefslogtreecommitdiff
path: root/bh20seqanalyzer/main.py
blob: 2513ea348e12e07d11dca7491a045e635d94e407 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import argparse
import arvados
import arvados.collection
import time
import subprocess
import tempfile
import json
import logging

logging.basicConfig(format="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                    level=logging.INFO)
logging.getLogger("googleapiclient.discovery").setLevel(logging.WARN)

def validate_upload(api, collection, validated_project, latest_result_uuid):
    col = arvados.collection.Collection(collection["uuid"])

    # validate the collection here.  Check metadata, etc.
    valid = True

    if "sequence.fasta" not in col:
        valid = False
        logging.warn("Upload '%s' missing sequence.fasta", collection["name"])
    if "metadata.jsonld" not in col:
        logging.warn("Upload '%s' missing metadata.jsonld", collection["name"])
        valid = False

    dup = api.collections().list(filters=[["owner_uuid", "=", validated_project],
                                          ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
    if dup["items"]:
        # This exact collection has been uploaded before.
        valid = False
        logging.warn("Upload '%s' is duplicate" % collection["name"])

    if valid:
        logging.info("Added '%s' to validated sequences" % collection["name"])
        # Move it to the "validated" project to be included in the next analysis
        api.collections().update(uuid=collection["uuid"], body={"owner_uuid": validated_project}).execute()
    else:
        # It is invalid, delete it.
        logging.warn("Deleting '%s'" % collection["name"])
        api.collections().delete(uuid=collection["uuid"]).execute()

    return valid

def start_analysis(api,
                   analysis_project,
                   workflow_uuid,
                   validated_project):

    project = api.groups().create(body={
        "group_class": "project",
        "name": "Pangenome analysis",
        "owner_uuid": analysis_project,
    }, ensure_unique_name=True).execute()

    validated = arvados.util.list_all(api.collections().list, filters=[["owner_uuid", "=", validated_project]])

    with tempfile.NamedTemporaryFile() as tmp:
        inputobj = {
            "inputReads": []
        }
        for v in validated:
            inputobj["inputReads"].append({
                "class": "File",
                "location": "keep:%s/sequence.fasta" % v["portable_data_hash"]
            })
        tmp.write(json.dumps(inputobj, indent=2).encode('utf-8'))
        tmp.flush()
        cmd = ["arvados-cwl-runner",
               "--submit",
               "--no-wait",
               "--debug",
               "--project-uuid=%s" % project["uuid"],
               "arvwf:%s" % workflow_uuid,
               tmp.name]
        logging.info("Running %s" % ' '.join(cmd))
        comp = subprocess.run(cmd, capture_output=True)
    if comp.returncode != 0:
        logging.error(comp.stderr.decode('utf-8'))


def copy_most_recent_result(api, analysis_project, latest_result_uuid):
    most_recent_analysis = api.groups().list(filters=[['owner_uuid', '=', analysis_project]],
                                                  order="created_at desc", limit=1).execute()
    for m in most_recent_analysis["items"]:
        cr = api.container_requests().list(filters=[['owner_uuid', '=', m["uuid"]],
                                                    ["requesting_container_uuid", "=", None]]).execute()
        if cr["items"] and cr["items"][0]["output_uuid"]:
            wf = cr["items"][0]
            src = api.collections().get(uuid=wf["output_uuid"]).execute()
            dst = api.collections().get(uuid=latest_result_uuid).execute()
            if src["portable_data_hash"] != dst["portable_data_hash"]:
                logging.info("Copying latest result from '%s' to %s", m["name"], latest_result_uuid)
                api.collections().update(uuid=latest_result_uuid,
                                         body={"manifest_text": src["manifest_text"],
                                               "description": "latest result from %s %s" % (m["name"], wf["uuid"])}).execute()
            break


def main():
    parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
    parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
    parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='')
    parser.add_argument('--validated-project', type=str, default='lugli-j7d0g-5ct8p1i1wrgyjvp', help='')
    parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='')
    parser.add_argument('--latest-result-uuid', type=str, default='lugli-4zz18-z513nlpqm03hpca', help='')
    args = parser.parse_args()

    api = arvados.api()

    logging.info("Starting up, monitoring %s for uploads" % (args.uploader_project))

    while True:
        new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute()
        at_least_one_new_valid_seq = False
        for c in new_collections["items"]:
            at_least_one_new_valid_seq = validate_upload(api, c, args.validated_project) or at_least_one_new_valid_seq

        if at_least_one_new_valid_seq:
            start_analysis(api, args.analysis_project,
                           args.workflow_uuid,
                           args.validated_project)

        copy_most_recent_result(api, args.analysis_project, args.latest_result_uuid)

        time.sleep(10)