1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
import argparse
import arvados
import time
import subprocess
import tempfile
import json
def start_analysis(api, collection, analysis_project, workflow_uuid):
project = api.groups().create(body={
"group_class": "project",
"name": "Analysis of %s" % collection["name"],
"owner_uuid": analysis_project,
}, ensure_unique_name=True).execute()
with tempfile.NamedTemporaryFile() as tmp:
inputobj = json.dumps({
"sequence": {
"class": "File",
"location": "keep:%s/sequence.fasta" % collection["portable_data_hash"]
},
"metadata": {
"class": "File",
"location": "keep:%s/metadata.jsonld" % collection["portable_data_hash"]
}
}, indent=2)
tmp.write(inputobj.encode('utf-8'))
tmp.flush()
cmd = ["arvados-cwl-runner",
"--submit",
"--no-wait",
"--debug",
"--project-uuid=%s" % project["uuid"],
"arvwf:%s" % workflow_uuid,
tmp.name]
print("Running %s" % ' '.join(cmd))
comp = subprocess.run(cmd, capture_output=True)
if comp.returncode != 0:
print(comp.stderr.decode('utf-8'))
else:
api.collections().update(uuid=collection["uuid"], body={"owner_uuid": project['uuid']}).execute()
def main():
parser = argparse.ArgumentParser(description='Analyze collections uploaded to a project')
parser.add_argument('--uploader-project', type=str, default='lugli-j7d0g-n5clictpuvwk8aa', help='')
parser.add_argument('--analysis-project', type=str, default='lugli-j7d0g-y4k4uswcqi3ku56', help='')
parser.add_argument('--workflow-uuid', type=str, default='lugli-7fd4e-mqfu9y3ofnpnho1', help='')
args = parser.parse_args()
api = arvados.api()
while True:
new_collections = api.collections().list(filters=[['owner_uuid', '=', args.uploader_project]]).execute()
for c in new_collections["items"]:
start_analysis(api, c, args.analysis_project, args.workflow_uuid)
time.sleep(10)
|