File: test_gzip_support.py

package info (click to toggle)
python-pybedtools 0.10.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 16,620 kB
  • sloc: python: 10,030; cpp: 899; makefile: 142; sh: 57
file content (121 lines) | stat: -rw-r--r-- 3,729 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os
import tempfile
import pybedtools.test.tfuncs as tfuncs

import pybedtools
import gzip


def teardown_module():
    pybedtools.cleanup()

def _make_temporary_gzip(bed_filename):
    """
    Make a gzip file on the fly
    :param bed_filename: Filename of bed file to gzip
    :return: filename of gzipped file
    """
    orig_suffix = pybedtools.settings.tempfile_suffix
    pybedtools.settings.tempfile_suffix = ".gz"
    gz_filename = pybedtools.BedTool._tmp()
    pybedtools.settings.tempfile_suffix = orig_suffix
    with gzip.open(gz_filename, "wb") as out_:
        with open(bed_filename, "rb") as in_:
            out_.writelines(in_)
    return gz_filename


def test_gzipped_file_types_are_bed():
    agz = _make_temporary_gzip(pybedtools.example_filename("a.bed"))

    agz = pybedtools.BedTool(agz)
    assert "bed" == agz.file_type


def test_gzipped_files_can_be_intersected():
    agz = _make_temporary_gzip(pybedtools.example_filename("a.bed"))
    bgz = _make_temporary_gzip(pybedtools.example_filename("b.bed"))

    agz = pybedtools.BedTool(agz)
    bgz = pybedtools.BedTool(bgz)

    a = pybedtools.example_bedtool("a.bed")
    b = pybedtools.example_bedtool("b.bed")
    assert a.intersect(b) == agz.intersect(bgz) == a.intersect(bgz) == agz.intersect(b)


def test_gzipped_files_are_iterable_as_normal():
    agz = _make_temporary_gzip(pybedtools.example_filename("a.bed"))
    agz = pybedtools.BedTool(agz)
    a = pybedtools.example_bedtool("a.bed")
    for i in agz:
        print(i)
    assert list(a) == list(agz)


def test_str_representation_of_gzipped_files_is_the_same_as_normal():
    agz = _make_temporary_gzip(pybedtools.example_filename("a.bed"))
    agz = pybedtools.BedTool(agz)
    a = pybedtools.example_bedtool("a.bed")
    assert str(a) == str(agz)


def test_head_of_gzipped_files_is_the_same_as_normal():
    agz = _make_temporary_gzip(pybedtools.example_filename("a.bed"))
    agz = pybedtools.BedTool(agz)
    a = pybedtools.example_bedtool("a.bed")
    assert agz.head() == a.head()


def test_gzipped_output():
    _filename = pybedtools.example_filename("a.bed")
    compressed_file = pybedtools.BedTool(_filename).saveas(compressed=True)

    # Open gzipped file in text mode
    with gzip.open(compressed_file.fn, "rt") as gf:
        uncompressed_content = gf.read()

    with open(_filename) as f:
        original_content = f.read()

    assert original_content == uncompressed_content


def test_gzipping_is_default_when_extension_is_dot_gz():
    _filename = pybedtools.example_filename("a.bed")
    with open(_filename) as f:
        expected_content = f.read()

    __, temp_filename = tempfile.mkstemp(suffix=".gz")
    try:
        bedtool = pybedtools.BedTool(_filename)
        bedtool.saveas(fn=temp_filename)

        with gzip.open(temp_filename, "rt") as gf:
            # gzip will fail next line if file is not gzipped
            actual_content = gf.read()

        assert expected_content == actual_content
    finally:
        if os.path.isfile(temp_filename):
            os.unlink(temp_filename)


def test_gzipping_can_be_turned_off_even_for_dot_gz():
    _filename = pybedtools.example_filename("a.bed")
    with open(_filename) as f:
        expected_content = f.read()

    __, temp_filename = tempfile.mkstemp(suffix=".gz")
    try:
        bedtool = pybedtools.BedTool(_filename)
        bedtool.saveas(fn=temp_filename, compressed=False)

        with open(temp_filename) as non_gz_f:
            # actual content will be jumbled if non_gz_f is unset
            actual_content = non_gz_f.read()

        assert expected_content == actual_content
    finally:
        if os.path.isfile(temp_filename):
            os.unlink(temp_filename)