File: binned_array_tests.py

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (131 lines) | stat: -rw-r--r-- 3,896 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
Tests for `bx.binned_array`.
"""

import pytest
from numpy import (
    allclose,
    concatenate,
    nan,
    zeros,
)
from numpy.random import default_rng

from bx.binned_array import (
    BinnedArray,
    BinnedArrayWriter,
    FileBinnedArray,
)

# Bigger values take longer, but excercise more bins
CHUNK_SIZE_RANDOM = 945
CHUNK_SIZE_ZEROS = 897
# CHUNK_SIZE_RANDOM=9456
# CHUNK_SIZE_ZEROS=8972

rng = default_rng()


@pytest.fixture(scope="module")
def source_target():
    source = []
    for _ in range(13):
        if rng.random() < 0.5:
            source = concatenate((source, rng.random(CHUNK_SIZE_RANDOM)))
        else:
            source = concatenate((source, zeros(CHUNK_SIZE_ZEROS, "f")))
    source = source.astype("f")
    # Set on target
    target = BinnedArray(128, nan, len(source))
    for i in range(len(source)):
        # if not isNaN( source[i] ):
        target[i] = source[i]
    return source, target


def test_simple(source_target):
    source, target = source_target
    # Verify
    for i in range(len(source)):
        assert source[i] == target[i], "No match, index: %d, source: %f, target: %f, len( source ): %d" % (
            i,
            source[i],
            target[i],
            len(source),
        )
    # Verify with slices
    for _ in range(10):
        a = int(rng.random() * len(source))
        b = int(rng.random() * len(source))
        if b < a:
            a, b = b, a
        assert allclose(source[a:b], target[a:b]), "No match, index: %d:%d, source: %s, target: %s" % (
            a,
            b,
            ",".join(map(str, source[a : a + 10])),
            ",".join(map(str, target[a : a + 10])),
        )


def test_file(source_target):
    source, target = source_target
    # With a file (zlib)
    target.to_file(open("/tmp/foo", "wb"))
    target2 = FileBinnedArray(open("/tmp/foo", "rb"))
    for i in range(len(source)):
        assert source[i] == target2[i], "No match, index: %d, source: %d, target: %d" % (i, source[i], target2[i])
    # Verify with slices
    target2 = FileBinnedArray(open("/tmp/foo", "rb"))
    for _ in range(10):
        a = int(rng.random() * len(source))
        b = int(rng.random() * len(source))
        if b < a:
            a, b = b, a
        assert allclose(source[a:b], target[a:b]), "No match, index: %d:%d, source: %s, target: %s" % (
            a,
            b,
            ",".join(map(str, source[a : a + 10])),
            ",".join(map(str, target2[a : a + 10])),
        )


def test_file_lzo(source_target):
    source, target = source_target
    # With a file (lzo)
    target.to_file(open("/tmp/foo3", "wb"), comp_type="lzo")
    target3 = FileBinnedArray(open("/tmp/foo3", "rb"))
    # Verify
    for i in range(len(source)):
        assert source[i] == target3[i], "No match, index: %d, source: %d, target: %d" % (i, source[i], target3[i])
    # Verify with slices
    target3 = FileBinnedArray(open("/tmp/foo3", "rb"))
    for _ in range(10):
        a = int(rng.random() * len(source))
        b = int(rng.random() * len(source))
        if b < a:
            a, b = b, a
        assert allclose(source[a:b], target3[a:b]), "No match, index: %d:%d, source: %s, target: %s" % (
            a,
            b,
            ",".join(map(str, source[a : a + 10])),
            ",".join(map(str, target3[a : a + 10])),
        )


def test_binned_array_writer(source_target):
    source, target = source_target
    # Test with ba writer
    o = open("/tmp/foo4", "wb")
    w = BinnedArrayWriter(o, 128, comp_type="lzo")
    for val in source:
        w.write(val)
    w.finish()
    o.close()
    # Verify
    target4 = FileBinnedArray(open("/tmp/foo4", "rb"))
    for i in range(len(source)):
        assert allclose(source[i], target4[i]), "No match, index: %d, source: %d, target: %d" % (
            i,
            source[i],
            target4[i],
        )