File: test_util.py

package info (click to toggle)
python-cooler 0.10.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 32,600 kB
  • sloc: python: 11,033; makefile: 173; sh: 31
file content (327 lines) | stat: -rw-r--r-- 9,792 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import os.path as op
from io import BytesIO

import h5py
import numpy as np
import pandas as pd
import pytest

from cooler import util

testdir = op.realpath(op.dirname(__file__))
datadir = op.join(testdir, "data")


def test_partition():
    p = list(util.partition(0, 9, 2))
    assert p == [(0, 2), (2, 4), (4, 6), (6, 8), (8, 9)]


def test_buffered():
    a = pd.DataFrame(np.random.random((4, 3)), columns=["a", "b", "c"])
    b = pd.DataFrame(np.random.random((3, 3)), columns=["a", "b", "c"])
    c = pd.DataFrame(np.random.random((3, 3)), columns=["a", "b", "c"])
    it = util.buffered([a, b, c], size=6)
    assert len(next(it)) == 7
    assert len(next(it)) == 3


def test_rlencode():
    s, l, v = util.rlencode([1, 1, 1, 1, 5, 5, 5, 5, 3, 3, 8, 9, 9])  # noqa
    assert list(s) == [0, 4, 8, 10, 11]
    assert list(l) == [4, 4, 2, 1, 2]
    assert list(v) == [1, 5, 3, 8, 9]

    s, l, v = util.rlencode([])  # noqa
    assert list(s) == []
    assert list(l) == []
    assert list(v) == []


def test_parse_cooler_uri():
    for uri in [
        "/foo/bar/baz.mcool::resolutions/1000",
        "/foo/bar/baz.mcool::/resolutions/1000",
    ]:
        fp, gp = util.parse_cooler_uri(uri)
        assert fp == "/foo/bar/baz.mcool"
        assert gp == "/resolutions/1000"

    for uri in ["/foo/bar/baz.cool", "/foo/bar/baz.cool::/"]:
        fp, gp = util.parse_cooler_uri(uri)
        assert fp == "/foo/bar/baz.cool"
        assert gp == "/"

    for uri in [
        "/foo/bar/baz.cool::/a/b::c.cool",
    ]:
        with pytest.raises(ValueError):
            util.parse_cooler_uri(uri)


def test_atoi():
    assert util.atoi("1,000") == 1000
    with pytest.raises(ValueError):
        assert util.atoi("1,000.05")  # not an integer


def test_parse_region_string():
    # UCSC-style names
    assert util.parse_region_string("chr21") == ("chr21", None, None)
    assert util.parse_region_string("chr21:1000-2000") == ("chr21", 1000, 2000)
    assert util.parse_region_string("chr21:1,000-2,000") == ("chr21", 1000, 2000)

    # Ensembl style names
    assert util.parse_region_string("6") == ("6", None, None)
    assert util.parse_region_string("6:1000-2000") == ("6", 1000, 2000)
    assert util.parse_region_string("6:1,000-2,000") == ("6", 1000, 2000)

    # FASTA style names
    assert util.parse_region_string("gb|accession|locus") == (
        "gb|accession|locus",
        None,
        None,
    )
    assert util.parse_region_string("gb|accession|locus:1000-2000") == (
        "gb|accession|locus",
        1000,
        2000,
    )
    assert util.parse_region_string("gb|accession|locus:1,000-2,000") == (
        "gb|accession|locus",
        1000,
        2000,
    )

    # Punctuation in names (aside from :)
    assert util.parse_region_string("name-with-hyphens-") == (
        "name-with-hyphens-",
        None,
        None,
    )
    assert util.parse_region_string("GL000207.1") == ("GL000207.1", None, None)
    assert util.parse_region_string("GL000207.1:1000-2000") == (
        "GL000207.1",
        1000,
        2000,
    )

    # Trailing dash
    assert util.parse_region_string("chr21:1000-") == ("chr21", 1000, None)

    # Humanized units
    assert util.parse_region_string("6:1kb-2kb") == ("6", 1000, 2000)
    assert util.parse_region_string("6:1k-2000") == ("6", 1000, 2000)
    assert util.parse_region_string("6:1kb-2M") == ("6", 1000, 2000000)
    assert util.parse_region_string("6:1Gb-") == ("6", 1000000000, None)

    # Bad inputs
    for region in [
        "chr1:2,000-1,000",  # reverse selection
        "chr1::1000-2000",  # more than one colon
        "chr1:1kb-2kDa",  # unknown unit kDa
        "chr1:1000",  # missing end
        "chr1:-2000",  # missing start
        ":1000-2000",  # missing chromosome name
        "chr1:$100-300",  # invalid token
    ]:
        with pytest.raises(ValueError):
            util.parse_region_string(region)


def test_parse_region():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    assert util.parse_region(("chr1", 0, 10)) == ("chr1", 0, 10)
    assert util.parse_region("chr1:0-10") == ("chr1", 0, 10)
    assert util.parse_region("chr1:0-", chromsizes) == ("chr1", 0, chromsizes["chr1"])

    # Don't accept undefined end unless chromsizes exists
    # NOTE: parse_region_string works here
    with pytest.raises(ValueError):
        util.parse_region("chr1:0-")

    # catch end < start in non-string case
    with pytest.raises(ValueError):
        util.parse_region(("chr1", 10, 0))

    # catch errors when chromsizes is given
    for region in [
        ("chr1", 0, 1000),
        ("chr1", -5, 10),
        ("DoesNotExist", 0, 10),
        "DoesNotExist",
    ]:
        with pytest.raises(ValueError):
            util.parse_region(region, chromsizes)


def test_natsort():
    chroms_alpha = ["chr1", "chr10", "chr2", "chr3"]
    chroms_nat = ["chr1", "chr2", "chr3", "chr10"]
    assert util.natsorted(chroms_alpha) == chroms_nat
    assert list(util.argnatsort(chroms_alpha)) == [0, 2, 3, 1]


def test_read_chromsizes():
    util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))


# def test_fetch_chromsizes():
#     util.fetch_chromsizes("hg19")


def test_load_fasta():
    fa = util.load_fasta(["chr1", "chr2"], op.join(datadir, "toy.fasta"))
    assert len(fa["chr1"]) == 32
    assert len(fa["chr2"]) == 32

    with pytest.raises(ValueError):
        util.load_fasta(["chr1", "chr2"])

    # s1 = StringIO(">chr1\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
    # s2 = StringIO(">chr2\nTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
    # fa = util.load_fasta(['chr1', 'chr2'], s1, s2)
    # assert len(fa['chr1']) == 32
    # assert len(fa['chr2']) == 31


def test_binnify():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    assert len(bins) == 8


def test_digest():
    fa = util.load_fasta(["chr1", "chr2"], op.join(datadir, "toy.fasta"))
    bins = util.digest(fa, "HindIII")
    assert len(bins) == 2

    with pytest.raises(ValueError):
        util.digest(fa, "HindMCMXCIX")


def test_get_binsize():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    assert util.get_binsize(bins) == 10

    # variable-sized bins
    bins = pd.read_csv(
        op.join(datadir, "toy.bins.var.bed"), names=["chrom", "start", "end"], sep="\t"
    )
    assert util.get_binsize(bins) is None

    # ambiguous case: one bin per chromosome with different lengths
    bins = pd.DataFrame(
        {"chrom": ["chr1", "chr2", "chr3"], "start": [0, 0, 0], "end": [100, 200, 300]}
    )
    assert util.get_binsize(bins) is None


def test_get_chromsizes():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    assert np.allclose(util.get_chromsizes(bins), chromsizes)


def test_bedslice():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    grouped = bins.groupby("chrom", observed=True)
    df = util.bedslice(grouped, chromsizes, "chr1:0-12")
    assert df["chrom"].tolist() == ["chr1", "chr1"]
    assert df["start"].tolist() == [0, 10]


def test_cmd_exists():
    util.cmd_exists("ls")


def test_mad():
    from scipy.stats import median_abs_deviation

    x = np.arange(50)
    assert np.isclose(util.mad(x), median_abs_deviation(x, scale=1))


def test_hdf5_contextmanagers():
    path = op.join(datadir, "toy.symm.upper.2.cool")

    # file path creates managed handle that gets closed on teardown
    with util.open_hdf5(path) as f:
        pass
    assert not f.id

    # allow appendable open file to pass through with mode='r'
    # might be good to raise a warning
    f = h5py.File(path, "r+")
    with util.open_hdf5(f, "r"):
        pass
    assert f.id
    f.close()

    # open file passes through without getting closed on teardown
    f = h5py.File(path, "r")
    with util.open_hdf5(f):
        pass
    assert f.id

    # can't change mode on open file
    with pytest.raises(ValueError):
        with util.open_hdf5(f, "r+"):
            pass

    # not allowed on open files
    for mode in ["w", "w-", "x"]:
        with pytest.raises(ValueError):
            with util.open_hdf5(f, mode):
                pass

    # group's parent file gets closed on teardown
    with util.closing_hdf5(f["chroms"]):
        pass
    assert not f.id

    # closing works as a standalone object, not only as a contextmanager
    f = h5py.File(path, "r")
    grp = util.closing_hdf5(f["chroms"])
    grp.close()
    assert not f.id


def test_hdf5_attrs_to_jsonable_dict():
    b = BytesIO()
    f = h5py.File(b, "a")
    f.attrs["a"] = np.array([1, 2, 3])
    f.attrs["b"] = "hello"
    f.attrs["c"] = 3
    dct = util.attrs_to_jsonable(f.attrs)
    assert dct["a"] == [1, 2, 3]
    assert dct["b"] == "hello"
    assert dct["c"] == 3


def test_check_bins():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    bins["chrom"] = bins["chrom"].astype(str)
    bins = util.check_bins(bins, chromsizes)
    assert isinstance(bins["chrom"].dtype, pd.CategoricalDtype)


def test_genome_segmentation():
    chromsizes = util.read_chromsizes(op.join(datadir, "toy.chrom.sizes"))
    bins = util.binnify(chromsizes, 10)
    gs = util.GenomeSegmentation(chromsizes, bins)
    df = gs.fetch("chr1")
    assert len(df) == 4
    df = gs.fetch("chr1:2-30")
    assert len(df) == 3
    util.balanced_partition(gs, 2, ["chr1"])


def test_dataframe_meta():
    df = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
    util.infer_meta(df)
    # meta2 = util.get_meta(df.columns, df.dtypes)
    # assert meta1 == meta2