aboutsummaryrefslogtreecommitdiff
path: root/bh20sequploader/qc_fasta.py
blob: 37eb4e86ea832610fe87a2317dc3bfbb4b66062e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import pkg_resources
import tempfile
import magic
import subprocess
import tempfile
import logging
import re
import io
import gzip

log = logging.getLogger(__name__ )

def read_fasta(sequence):
    entries = 0
    bases = []
    label = None
    for line in sequence:
        if line.startswith(">"):
            label = line
            entries += 1
        else:
            bases.append(line)
        if entries > 1:
            log.debug("FASTA file contains multiple entries")
            raise ValueError("FASTA file contains multiple entries")
    return label, bases

def qc_fasta(arg_sequence, check_with_clustalw=True):
    log.debug("Starting qc_fasta")
    schema_resource = pkg_resources.resource_stream(__name__, "validation/formats")
    with tempfile.NamedTemporaryFile() as tmp:
        tmp.write(schema_resource.read())
        tmp.flush()
        val = magic.Magic(magic_file=tmp.name,
                          uncompress=False, mime=True)

    gz = ""
    if arg_sequence.name.endswith(".gz"):
        sequence = gzip.GzipFile(fileobj=arg_sequence, mode='rb')
        gz = ".gz"
    else:
        sequence = arg_sequence

    sequence = io.TextIOWrapper(sequence)
    r = sequence.read(4096)
    sequence.seek(0)

    seqlabel = r[1:r.index("\n")]
    seq_type = val.from_buffer(r).lower()

    if seq_type == "text/fasta":
        # ensure that contains only one entry
        submitlabel, submitseq = read_fasta(sequence)
        sequence.seek(0)
        sequence.detach()

        if not check_with_clustalw:
            return ("sequence.fasta"+gz, seqlabel)

        with tempfile.NamedTemporaryFile() as tmp1:
            with tempfile.NamedTemporaryFile() as tmp2:
                refstring = pkg_resources.resource_string(__name__, "SARS-CoV-2-reference.fasta")
                tmp1.write(refstring)
                tmp1.flush()
                tmp2.write(submitlabel.encode("utf8"))
                tmp2.write(("".join(submitseq)).encode("utf8"))
                tmp2.flush()
                subbp = 0
                refbp = 0
                similarity = 0
                try:
                    cmd = ["minimap2", "-c", tmp1.name, tmp2.name]
                    logging.info("QC checking similarity to reference")
                    logging.info(" ".join(cmd))
                    result = subprocess.run(cmd, stdout=subprocess.PIPE)
                    result.check_returncode()
                    res = result.stdout.decode("utf-8")
                    mm = res.split("\t")
                    if len(mm) >= 10:
                        # divide Number of matching bases in the mapping / Target sequence length
                        similarity = (float(mm[9]) / float(mm[6])) * 100.0
                    else:
                        similarity = 0
                except Exception as e:
                    logging.warn("QC against reference sequence using 'minimap2': %s", e, exc_info=e)

                if similarity and similarity < 70.0:
                    raise ValueError("QC fail: alignment to reference was less than 70%% (was %2.2f%%)" % (similarity))
                if similarity == 0:
                    raise ValueError("QC fail")

        return ("sequence.fasta"+gz, seqlabel)
    elif seq_type == "text/fastq":
        sequence.seek(0)
        sequence.detach()
        return ("reads.fastq"+gz, seqlabel)
    else:
        raise ValueError("Sequence file does not look like a DNA FASTA or FASTQ")