File: bigwig_file.pyx

package info (click to toggle)
python-bx 0.13.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 5,000 kB
  • sloc: python: 17,136; ansic: 2,326; makefile: 24; sh: 8
file content (227 lines) | stat: -rw-r--r-- 8,023 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
BigWig file.
"""

import zlib
from collections import deque
from io import BytesIO

import numpy

from bx.misc.binary_file import BinaryFileReader

cimport numpy

from .bbi_file cimport (
    BBIFile,
    BlockHandler,
    SummarizedData,
)
from .cirtree_file cimport CIRTreeFile
from .types cimport (
    bits8,
    bits16,
    bits32,
    UBYTE,
)

DEF big_wig_sig = 0x888FFC26
DEF bwg_bed_graph = 1
DEF bwg_variable_step = 2
DEF bwg_fixed_step = 3

cdef inline int range_intersection( int start1, int end1, int start2, int end2 ):
    return min( end1, end2 ) - max( start1, start2 )

cdef class BigWigBlockHandler( BlockHandler ):
    """
    BlockHandler that parses the block into a series of wiggle records, and calls `handle_interval_value` for each.
    """
    cdef bits32 start
    cdef bits32 end
    def __init__( self, bits32 start, bits32 end ):
        BlockHandler.__init__( self )
        self.start = start
        self.end = end
    cdef handle_block( self, bytes block_data, BBIFile bbi_file ):
        cdef bits32 b_chrom_id, b_start, b_end, b_valid_count
        cdef bits32 b_item_step, b_item_span
        cdef bits16 b_item_count
        cdef UBYTE b_type
        cdef int s, e
        cdef float val
        # Now we parse the block, first the header
        block_reader = BinaryFileReader( BytesIO( block_data ), is_little_endian=bbi_file.reader.is_little_endian )
        b_chrom_id = block_reader.read_uint32()
        b_start = block_reader.read_uint32()
        b_end = block_reader.read_uint32()
        b_item_step = block_reader.read_uint32()
        b_item_span = block_reader.read_uint32()
        b_type = block_reader.read_uint8()
        block_reader.skip(1)
        b_item_count = block_reader.read_uint16()
        for i from 0 <= i < b_item_count:
            # Depending on the type, s and e are either read or 
            # generate using header, val is always read
            if b_type == bwg_bed_graph: 
                s = block_reader.read_uint32()
                e = block_reader.read_uint32()
                val = block_reader.read_float()
            elif b_type == bwg_variable_step:
                s = block_reader.read_uint32()
                e = s + b_item_span
                val = block_reader.read_float()
            elif b_type == bwg_fixed_step:
                s = b_start + ( i * b_item_span )
                e = s + b_item_span
                val = block_reader.read_float()
            else:
                # FIXME: raise exception???
                #        s, e, val are uninitialized/not updated at this point!
                pass
            if s < self.start: 
                s = self.start
            if e > self.end: 
                e = self.end
            if s >= e: 
                continue
            self.handle_interval_value( s, e, val )

    cdef handle_interval_value( self, bits32 s, bits32 e, float val ):
        pass

cdef class SummarizingBlockHandler( BigWigBlockHandler ):
    """
    Accumulates intervals into a SummarizedData
    """
    cdef SummarizedData sd
    def __init__( self, bits32 start, bits32 end, int summary_size ):
        BigWigBlockHandler.__init__( self, start, end )
        # What we will load into
        self.sd = SummarizedData( start, end, summary_size )
        for i in range(summary_size):
            self.sd.min_val[i] = +numpy.inf
        for i in range(summary_size):
            self.sd.max_val[i] = -numpy.inf

    cdef handle_interval_value( self, bits32 s, bits32 e, float val ):
        self.sd.accumulate_interval_value( s, e, val )

cdef class IntervalAccumulatingBlockHandler( BigWigBlockHandler ):
    cdef list intervals
    """
    Accumulates intervals into a list of intervals with values
    """
    def __init__( self, bits32 start, bits32 end ):
        BigWigBlockHandler.__init__( self, start, end )
        self.intervals = []

    cdef handle_interval_value( self, bits32 s, bits32 e, float val ):
        self.intervals.append( ( s, e, val ) )

cdef class ArrayAccumulatingBlockHandler( BigWigBlockHandler ):
    """
    Accumulates intervals into a list of intervals with values
    """
    cdef numpy.ndarray array
    def __init__( self, bits32 start, bits32 end ):
        BigWigBlockHandler.__init__( self, start, end )
        self.array = numpy.zeros( end - start, dtype=numpy.float32 )
        self.array[...] = numpy.nan

    cdef handle_interval_value( self, bits32 s, bits32 e, float val ):
        cdef numpy.ndarray[ numpy.float32_t, ndim=1 ] array = self.array
        cdef int i
        # Slicing is not optimized by Cython
        for i from s - self.start <= i < e - self.start:
            array[ i ] = val

cdef class BigWigHeaderBlockHandler( BigWigBlockHandler ):
    "Reads and returns headers"
    cdef list headers

    def __init__( self, bits32 start, bits32 end ):
        BigWigBlockHandler.__init__( self, start, end )
        self.headers = []

    cdef handle_block( self, bytes block_data, BBIFile bbi_file ):
        cdef bits32 b_chrom_id, b_start, b_end, b_valid_count
        cdef bits32 b_item_step, b_item_span
        cdef bits16 b_item_count
        cdef UBYTE b_type
        cdef int s, e
        cdef float val
        # parse the block header
        block_reader = BinaryFileReader( BytesIO( block_data ), is_little_endian=bbi_file.reader.is_little_endian )
        b_chrom_id = block_reader.read_uint32()
        b_start = block_reader.read_uint32()
        b_end = block_reader.read_uint32()
        b_item_step = block_reader.read_uint32()
        b_item_span = block_reader.read_uint32()
        b_type = block_reader.read_uint8()
        block_reader.skip(1)
        b_item_count = block_reader.read_uint16()
        self.handle_header( b_start, b_end,  b_item_step, b_item_span, b_type, b_item_count )

    cdef handle_header( self, bits32 start, bits32 end, bits32 step, bits32 span, bits8 type, bits16 itemCount ):
        self.headers.append( ( start, end, step, span, type, itemCount ) )

cdef class BigWigFile( BBIFile ):
    """
    A "big binary indexed" file whose raw data is in wiggle format.
    """
    def __init__( self, file=None ):
        BBIFile.__init__( self, file, big_wig_sig, "bigwig" )

    cdef _summarize_from_full( self, bits32 chrom_id, bits32 start, bits32 end, int summary_size ):
        """
        Create summary from full data.
        """
        v = SummarizingBlockHandler( start, end, summary_size )
        self.visit_blocks_in_region( chrom_id, start, end, v )
        # Round valid count, in place
        for i from 0 <= i < summary_size:
            v.sd.valid_count[i] = round( v.sd.valid_count[i] )
        return v.sd

    cpdef get( self, char * chrom, bits32 start, bits32 end ):
        """
        Gets all data points over the regions `chrom`:`start`-`end`.
        """
        if start >= end:
            return None
        chrom_id, chrom_size = self._get_chrom_id_and_size( chrom )
        if chrom_id is None:
            return None
        v = IntervalAccumulatingBlockHandler( start, end )
        self.visit_blocks_in_region( chrom_id, start, end, v )
        return v.intervals

    cpdef get_as_array( self, char * chrom, bits32 start, bits32 end ):
        """
        Gets all data points over the regions `chrom`:`start`-`end`.
        """
        if start >= end:
            return None
        chrom_id, chrom_size = self._get_chrom_id_and_size( chrom )
        if chrom_id is None:
            return None
        v = ArrayAccumulatingBlockHandler( start, end )
        self.visit_blocks_in_region( chrom_id, start, end, v )
        return v.array

    cpdef get_headers( self, char * chrom, bits32 start, bits32 end ):
        if start >= end:
            return None
        chrom_id, chrom_size = self._get_chrom_id_and_size( chrom )
        if chrom_id is None:
            return None
        v = BigWigHeaderBlockHandler( start, end )
        self.visit_blocks_in_region( chrom_id, start, end, v )
        return v.headers