aboutsummaryrefslogtreecommitdiff
path: root/bh20sequploader/qc_fasta.py
blob: 814fb3eea320925e3a15233b71000760e7308742 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import pkg_resources
import tempfile
import magic
import subprocess
import tempfile
import logging
import re
import io
import gzip

log = logging.getLogger(__name__ )

def read_fasta(sequence):
    entries = 0
    bases = []
    label = None
    for line in sequence:
        if line.startswith(">"):
            label = line
            entries += 1
        else:
            bases.append(line)
        if entries > 1:
            log.debug("FASTA file contains multiple entries")
            raise ValueError("FASTA file contains multiple entries")
    return label, bases

def qc_fasta(arg_sequence, check_with_mimimap2=True):
    log.debug("Starting qc_fasta")
    schema_resource = pkg_resources.resource_stream(__name__, "validation/formats")
    with tempfile.NamedTemporaryFile() as tmp:
        tmp.write(schema_resource.read())
        tmp.flush()
        val = magic.Magic(magic_file=tmp.name,
                          uncompress=False, mime=True)

    gz = ""
    if arg_sequence.name.endswith(".gz"):
        sequence = gzip.GzipFile(fileobj=arg_sequence, mode='rb')
        gz = ".gz"
    else:
        sequence = arg_sequence

    sequence = io.TextIOWrapper(sequence)
    r = sequence.read(4096)
    sequence.seek(0)

    seqlabel = r[1:r.index("\n")]
    seq_type = val.from_buffer(r).lower()

    if seq_type == "text/fasta":
        # ensure that contains only one entry
        submitlabel, submitseq = read_fasta(sequence)
        sequence.seek(0)
        sequence.detach()

        if check_with_mimimap2:
            with tempfile.NamedTemporaryFile() as tmp1:
                with tempfile.NamedTemporaryFile() as tmp2:
                    refstring = pkg_resources.resource_string(__name__, "SARS-CoV-2-reference.fasta")
                    tmp1.write(refstring)
                    tmp1.flush()
                    tmp2.write(submitlabel.encode("utf8"))
                    tmp2.write(("".join(submitseq)).encode("utf8"))
                    tmp2.flush()

                    similarity = 0
                    try:
                        log.debug("Trying to run minimap2")
                        cmd = ["minimap2", "-c", "-x", "asm20", tmp1.name, tmp2.name]
                        logging.info("QC checking similarity to reference")
                        logging.info(" ".join(cmd))
                        result = subprocess.run(cmd, stdout=subprocess.PIPE)
                        result.check_returncode()
                        res = result.stdout.decode("utf-8")
                        mm = res.split("\t")
                        if len(mm) >= 10:
                            # divide Number of matching bases in the mapping / Target sequence length
                            similarity = (float(mm[9]) / float(mm[6])) * 100.0
                        else:
                            similarity = 0
                    except Exception as e:
                        logging.warn("QC against reference sequence using 'minimap2': %s", e, exc_info=e)

                    if similarity < 70.0:
                        raise ValueError(
                            f"QC fail for {seqlabel}: alignment to reference was less than 70% (was {similarity})")

        return "sequence.fasta" + gz, seqlabel, seq_type
    elif seq_type == "text/fastq":
        sequence.seek(0)
        sequence.detach()
        return "reads.fastq" + gz, seqlabel, seq_type
    else:
        log.debug(seqlabel)
        log.debug(seq_type)
        raise ValueError("Sequence file ({}) does not look like a DNA FASTA or FASTQ".format(arg_sequence))