aboutsummaryrefslogtreecommitdiff
path: root/bh20sequploader/main.py
blob: e2f089f50a943bf0c6d51c3782b773da3711148f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# This script is the CLI uploader for http://covid19.genenetwork.org/
#
# To upload a sequence with its metadata:
#
#   python3 bh20sequploader/main.py example/sequence.fasta example/maximum_metadata_example.yaml
#
# Usage: described in http://covid19.genenetwork.org/blog?id=using-covid-19-pubseq-part3

import argparse
import time
import arvados
import arvados.collection
import json
import logging
import magic
from pathlib import Path
import urllib.request
import socket
import getpass
import sys
sys.path.insert(0,'.')
from bh20sequploader.qc_metadata import qc_metadata
from bh20sequploader.qc_fasta import qc_fasta

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__ )
log.debug("Entering sequence uploader")

# ---- Tokens for uploading data to Arvados
ARVADOS_API_HOST='lugli.arvadosapi.com'
UPLOADER_API_TOKEN='2fbebpmbo3rw3x05ueu2i6nx70zhrsb1p22ycu3ry34m4x4462'
ANONYMOUS_API_TOKEN='5o42qdxpxp5cj15jqjf7vnxx5xduhm4ret703suuoa3ivfglfh'
UPLOAD_PROJECT='lugli-j7d0g-n5clictpuvwk8aa'
VALIDATED_PROJECT='lugli-j7d0g-5ct8p1i1wrgyjvp'

def qc_stuff(metadata, sequence_p1, sequence_p2, do_qc=True):
    """Quality control. Essentially it checks the RDF schema and the FASTA
sequence for enough overlap with the reference genome

    """
    failed = False
    sample_id = ''
    try:
        log.debug("Checking metadata" if do_qc else "Skipping metadata check")
        if do_qc:
            sample_id = qc_metadata(metadata.name)
            if not sample_id:
                log.warning("Failed metadata QC")
                failed = True
    except Exception as e:
        log.exception("Failed metadata QC")
        failed = True # continue with the FASTA checker

    target = []
    if sequence_p1:
        try:
            log.debug("FASTA/FASTQ QC" if do_qc else "Limited FASTA/FASTQ QC")
            target.append(qc_fasta(sequence_p1, check_with_mimimap2=do_qc))
            if sequence_p2:
                if target[0][2] == 'text/fasta':
                    raise ValueError("It is possible to upload just one FASTA file at a time")
                target.append(qc_fasta(sequence_p2))

                target[0] = ("reads_1."+target[0][0][6:], target[0][1], target[0][2])
                target[1] = ("reads_2."+target[1][0][6:], target[1][1], target[1][2])

            if do_qc and target[0][2] == 'text/fasta' and sample_id != target[0][1]:
                raise ValueError(f"The sample_id field in the metadata ({sample_id}) must be the same as the FASTA header ({target[0][1]})")
        except Exception as e:
            log.exception("Failed sequence QC")
            failed = True

    if failed:
        log.debug("Bailing out!")
        exit(1)

    return target

def upload_sequence(col, target, sequence):
    with col.open(target[0], "wb") as f:
        r = sequence.read(65536)
        while r:
            f.write(r)
            r = sequence.read(65536)


def main():
    parser = argparse.ArgumentParser(description='Upload SARS-CoV-19 sequences for analysis')
    parser.add_argument('metadata', type=argparse.FileType('r'), help='sequence metadata json')
    parser.add_argument('sequence_p1', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTA/FASTQ')
    parser.add_argument('sequence_p2', type=argparse.FileType('rb'), default=None, nargs='?', help='sequence FASTQ pair')
    parser.add_argument("--validate", action="store_true", help="Dry run, validate only")
    parser.add_argument("--skip-qc", action="store_true", help="Skip local qc check")
    parser.add_argument("--trusted", action="store_true", help="Trust local validation and add directly to validated project")
    args = parser.parse_args()

    if args.trusted:
        # Use credentials from environment
        api = arvados.api()
    else:
        api = arvados.api(host=ARVADOS_API_HOST, token=UPLOADER_API_TOKEN, insecure=True)

    # ---- First the QC
    target = qc_stuff(args.metadata, args.sequence_p1, args.sequence_p2, not args.skip_qc)
    if target:
        seqlabel = target[0][1]
    else:
        seqlabel = ""

    if args.validate:
        log.info("Valid")
        exit(0)

    col = arvados.collection.Collection(api_client=api)

    # ---- Upload the sequence to Arvados
    if args.sequence_p1:
        upload_sequence(col, target[0], args.sequence_p1)
        if args.sequence_p2:
            upload_sequence(col, target[1], args.sequence_p2)

    # ---- Make sure the metadata YAML is valid
    log.info("Reading metadata")
    with col.open("metadata.yaml", "w") as f:
        r = args.metadata.read(65536)
        log.info(r[0:20])
        while r:
            f.write(r)
            r = args.metadata.read(65536)

    # ---- Get the uploader IP address (gateway) and local user info
    external_ip = urllib.request.urlopen('https://ident.me').read().decode('utf8')

    try:
        username = getpass.getuser()
    except KeyError:
        username = "unknown"

    properties = {
        "sequence_label": seqlabel,
        "upload_app": "bh20-seq-uploader",
        "upload_ip": external_ip,
        "upload_user": "%s@%s" % (username, socket.gethostname())
    }

    # ---- Get ready for actual uploading
    api2 = arvados.api(host=ARVADOS_API_HOST, token=ANONYMOUS_API_TOKEN, insecure=True)
    dup = api2.collections().list(filters=[["owner_uuid", "in", [VALIDATED_PROJECT, UPLOAD_PROJECT]],
                                           ["portable_data_hash", "=", col.portable_data_hash()]]).execute()
    if dup["items"]:
        # This exact collection has been uploaded before.
        log.error("Duplicate of %s" % ([d["uuid"] for d in dup["items"]]))
        exit(1)

    if args.trusted:
        properties["status"] = "validated"
        owner_uuid = VALIDATED_PROJECT
    else:
        owner_uuid = UPLOAD_PROJECT

    # ---- and stream the 'collection' up
    col.save_new(owner_uuid=owner_uuid, name="%s uploaded by %s from %s" %
                 (seqlabel, properties['upload_user'], properties['upload_ip']),
                 properties=properties, ensure_unique_name=True)

    log.info("Saved to %s" % col.manifest_locator())
    log.info("Done")
    exit(0)

if __name__ == "__main__":
    main()