about summary refs log tree commit diff
path: root/tests/test_serialization.py
blob: a473796c496ebc73da7f1b6855fb6b75dc109c2e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
### pyhegp --- Homomorphic encryption of genotypes and phenotypes
### Copyright © 2025 Arun Isaac <arunisaac@systemreboot.net>
###
### This file is part of pyhegp.
###
### pyhegp is free software: you can redistribute it and/or modify it
### under the terms of the GNU General Public License as published by
### the Free Software Foundation, either version 3 of the License, or
### (at your option) any later version.
###
### pyhegp is distributed in the hope that it will be useful, but
### WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
### General Public License for more details.
###
### You should have received a copy of the GNU General Public License
### along with pyhegp. If not, see <https://www.gnu.org/licenses/>.

import tempfile

from hypothesis import given, strategies as st
from hypothesis.extra.numpy import arrays, array_shapes
from hypothesis.extra.pandas import column, columns, data_frames
import pandas as pd
from pytest import approx

from pyhegp.serialization import Summary, read_summary, write_summary, read_summary_headers, read_genotype, write_genotype, read_key, write_key
from pyhegp.utils import negate

tabless_printable_ascii_text = st.text(
    # Exclude control characters and tab.
    st.characters(codec="ascii",
                  exclude_categories=("Cc",),
                  exclude_characters=("\t",)),
    min_size=1)
chromosome_column = column(name="chromosome",
                           dtype="str",
                           elements=tabless_printable_ascii_text)
position_column = column(name="position",
                         dtype="int")
reference_column = column(name="reference",
                          dtype="str",
                          elements=st.text(
                              st.characters(codec="ascii",
                                            categories=(),
                                            include_characters=("A", "G", "C", "T")),
                              min_size=1))

@st.composite
def summaries(draw):
    return Summary(draw(st.integers()),
                   draw(data_frames(
                       columns=([chromosome_column, position_column]
                                + ([reference_column] if draw(st.booleans()) else [])
                                + columns(["mean", "std"],
                                          dtype="float64",
                                          elements=st.floats(allow_nan=False))))))

@given(summaries())
def test_read_write_summary_are_inverses(summary):
    with tempfile.TemporaryFile() as file:
        write_summary(file, summary)
        file.seek(0)
        recovered_summary = read_summary(file)
        pd.testing.assert_frame_equal(summary.data,
                                      recovered_summary.data)
        assert summary.n == recovered_summary.n

@st.composite
def properties_and_whitespace(draw):
    n = draw(st.integers(min_value=0, max_value=10))
    return (draw(st.dictionaries(st.text(st.characters(codec="ascii",
                                                       exclude_categories=("Cc", "Zs")),
                                         min_size=1)
                                 .filter(lambda key: not key.startswith("#")),
                                 st.text(st.characters(codec="ascii",
                                                       exclude_categories=("Cc",)),
                                         min_size=1),
                                 min_size=n, max_size=n)),
            draw(st.integers(min_value=0, max_value=10)),
            draw(st.lists(st.integers(min_value=0, max_value=10),
                          min_size=n, max_size=n)))

@given(properties_and_whitespace())
def test_read_summary_headers_variable_whitespace(properties_and_whitespace):
    properties, header_whitespace, key_value_whitespace = properties_and_whitespace
    with tempfile.TemporaryFile() as file:
        file.write(b"#")
        file.write(b" " * header_whitespace)
        file.write(b"pyhegp summary file version 1\n")
        for (key, value), key_value_whitespace in zip(properties.items(), key_value_whitespace):
            file.write(b"#")
            file.write(b" " * key_value_whitespace)
            file.write(f"{key} {value}\n".encode("ascii"))
        file.seek(0)
        assert properties == read_summary_headers(file)

def genotype_reserved_column_name_p(name):
    return name.lower() in {"chromosome", "position", "reference"}

sample_names = st.lists(tabless_printable_ascii_text
                        .filter(negate(genotype_reserved_column_name_p)),
                        unique=True)

@st.composite
def genotype_frames(draw):
    return draw(data_frames(
        columns=([chromosome_column, position_column]
                 + ([reference_column] if draw(st.booleans()) else [])
                 + columns(draw(sample_names),
                           dtype="float64",
                           elements=st.floats(allow_nan=False)))))

@given(genotype_frames())
def test_read_write_genotype_are_inverses(genotype):
    with tempfile.TemporaryFile() as file:
        write_genotype(file, genotype)
        file.seek(0)
        pd.testing.assert_frame_equal(genotype, read_genotype(file))

@given(arrays("float64",
              array_shapes(min_dims=2, max_dims=2)))
def test_read_write_key_are_inverses(key):
    with tempfile.TemporaryFile() as file:
        write_key(file, key)
        file.seek(0)
        assert key == approx(read_key(file), nan_ok=True)