1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
|
# cython: profile=False
"""
Core implementation for reading UCSC "big binary indexed" files.
There isn't really any specification for the format beyond the code, so this
mirrors Jim Kent's 'bbiRead.c' mostly.
"""
from cpython.version cimport PY_MAJOR_VERSION
import math
import sys
import zlib
from collections import deque
from io import BytesIO
import numpy
from bx.misc.binary_file import BinaryFileReader
cimport cython
from libc cimport limits
from .bpt_file cimport BPTFile
from .cirtree_file cimport CIRTreeFile
from .types cimport (
bits32,
bits64,
)
cdef extern from "Python.h":
char * PyBytes_AsString( object )
# Signatures for bbi related file types
cdef public int big_wig_sig = 0x888FFC26
cdef public int big_bed_sig = 0x8789F2EB
# Some record sizes for parsing
DEF summary_on_disk_size = 32
@cython.profile(False)
cdef inline int range_intersection( int start1, int end1, int start2, int end2 ):
return min( end1, end2 ) - max( start1, start2 )
@cython.profile(False)
cdef inline int imax(int a, int b): return a if a >= b else b
@cython.profile(False)
cdef inline int imin(int a, int b): return a if a <= b else b
cdef enum summary_type:
summary_type_mean = 0
summary_type_max = 1
summary_type_min = 2
summary_type_coverage = 3
summary_type_sd = 4
cdef class SummaryBlock:
"""
A block of summary data from disk
"""
pass
cdef class SummarizedData:
"""
The result of using SummaryBlocks read from the file to produce a
aggregation over a particular range and resolution
"""
def __init__( self, bits32 start, bits32 end, int size ):
self.start = start
self.end = end
self.size = size
self.valid_count = numpy.zeros( self.size, dtype=numpy.float64 )
self.min_val = numpy.zeros( self.size, dtype=numpy.float64 )
self.max_val = numpy.zeros( self.size, dtype=numpy.float64 )
self.sum_data = numpy.zeros( self.size, dtype=numpy.float64 )
self.sum_squares = numpy.zeros( self.size, dtype=numpy.float64 )
cdef accumulate_interval_value( self, bits32 s, bits32 e, float val ):
cdef int base_start, base_end, base_step, overlap, j, interval_size
cdef double overlap_factor, interval_weight
# We locally cdef the arrays so all indexing will be at C speeds
cdef numpy.ndarray[numpy.float64_t, ndim=1] valid_count = self.valid_count
cdef numpy.ndarray[numpy.float64_t, ndim=1] min_val = self.min_val
cdef numpy.ndarray[numpy.float64_t, ndim=1] max_val = self.max_val
cdef numpy.ndarray[numpy.float64_t, ndim=1] sum_data = self.sum_data
cdef numpy.ndarray[numpy.float64_t, ndim=1] sum_squares = self.sum_squares
# Trim interval down to region of interest
if s < self.start:
s = self.start
if e > self.end:
e = self.end
if s >= e:
return
base_step = ( self.end - self.start ) // self.size
for j from 0 <= j < self.size:
base_start = self.start + ( base_step * j )
base_end = base_start + base_step
overlap = range_intersection( base_start, base_end, s, e )
if overlap > 0:
interval_size = e - s
overlap_factor = <double> overlap / interval_size
interval_weight = interval_size * overlap_factor
valid_count[j] += interval_weight
sum_data[j] += val * interval_weight
sum_squares[j] += val * val * interval_weight
if max_val[j] < val:
max_val[j] = val
if min_val[j] > val:
min_val[j] = val
cdef class BlockHandler:
"""
Callback for `BBIFile.visit_blocks_in_region`
"""
cdef handle_block( self, bytes block_data, BBIFile bbi_file ):
pass
cdef class BBIFile:
"""
A "big binary indexed" file. Stores blocks of raw data and numeric
summaries of that data at different levels of aggregation ("zoom levels").
Generic enough to accommodate both wiggle and bed data.
"""
def __init__( self, file=None, expected_sig=None, type_name=None ):
if file is not None:
self.open( file, expected_sig, type_name )
def open( self, file, expected_sig, type_name ):
"""
Initialize from an existing bbi file, signature (magic) must be passed
in since this is generic.
"""
assert expected_sig is not None
self.file = file
# Open the file in a BinaryFileReader, handles magic and byteswapping
self.reader = reader = BinaryFileReader( file, expected_sig )
self.magic = expected_sig
self.is_byteswapped = self.reader.byteswap_needed
# Read header stuff
self.version = reader.read_uint16()
self.zoom_levels = reader.read_uint16()
self.chrom_tree_offset = reader.read_uint64()
self.unzoomed_data_offset = reader.read_uint64()
self.unzoomed_index_offset = reader.read_uint64()
self.field_count = reader.read_uint16()
self.defined_field_count = reader.read_uint16()
self.as_offset = reader.read_uint64()
self.total_summary_offset = reader.read_uint64()
self.uncompress_buf_size = reader.read_uint32()
# Skip reserved
reader.seek( 64 )
# Read zoom headers
self.level_list = []
for i from 0 <= i < self.zoom_levels:
level = ZoomLevel()
level.bbi_file = self
level.reduction_level = reader.read_uint32()
level.reserved = reader.read_uint32()
level.data_offset = reader.read_uint64()
level.index_offset = reader.read_uint64()
self.level_list.append( level )
# Initialize and attach embedded BPTFile containing chromosome names and ids
reader.seek( self.chrom_tree_offset )
self.chrom_bpt = BPTFile( file=self.file )
cdef visit_blocks_in_region( self, bits32 chrom_id, bits32 start, bits32 end, BlockHandler handler ):
"""
Visit each block from the full data that overlaps a specific region
"""
cdef CIRTreeFile ctf
reader = self.reader
reader.seek( self.unzoomed_index_offset )
ctf = CIRTreeFile( reader.file )
block_list = ctf.find_overlapping_blocks( chrom_id, start, end )
for offset, size in block_list:
# Seek to and read all data for the block
reader.seek( offset )
block_data = reader.read( size )
# Might need to uncompress
if self.uncompress_buf_size > 0:
block_data = zlib.decompress( block_data )
handler.handle_block( block_data, self )
cpdef summarize( self, object chrom, bits32 start, bits32 end, int summary_size ):
"""
Gets `summary_size` data points over the regions `chrom`:`start`-`end`.
"""
cdef char * cchrom
if PY_MAJOR_VERSION >= 3:
bytes_chrom = chrom.encode()
else:
bytes_chrom = chrom
cchrom = PyBytes_AsString(bytes_chrom)
if start >= end:
return None
chrom_id, chrom_size = self._get_chrom_id_and_size( cchrom )
if chrom_id is None:
return None
# Return value will be a structured array (rather than an array
# of summary element structures
# Find appropriate zoom level
cdef bits32 base_size = end - start
cdef int full_reduction = base_size // summary_size
cdef int zoom = full_reduction // 2
if zoom < 0:
zoom = 0
cdef ZoomLevel zoom_level = self._best_zoom_level( zoom )
if zoom_level is not None:
return zoom_level._summarize( chrom_id, start, end, summary_size )
else:
return self._summarize_from_full( chrom_id, start, end, summary_size )
cpdef summarize_from_full( self, char * chrom, bits32 start, bits32 end, int summary_size ):
"""
Gets `summary_size` data points over the regions `chrom`:`start`-`end`,
always using the raw data points
"""
if start >= end:
return None
chrom_id, chrom_size = self._get_chrom_id_and_size( chrom )
if chrom_id is None:
return None
# Return value will be a structured array (rather than an array
# of summary element structures
return self._summarize_from_full( chrom_id, start, end, summary_size )
cpdef query( self, object chrom, bits32 start, bits32 end, int summary_size ):
"""
Provides a different view of summary for region, a list of dictionaries
with keys: mean, max, min, coverage, std_dev
"""
if end > 2147483647 or start < 0:
raise ValueError
results = self.summarize(chrom, start, end, summary_size)
if not results:
return None
rval = []
for i in range(summary_size):
sum_data = results.sum_data[i]
valid_count = results.valid_count[i]
mean = sum_data / valid_count
coverage = <double> summary_size / (end - start) * valid_count
# print results.sum_squares[i], sum_data, valid_count
variance = results.sum_squares[i] - sum_data * sum_data / valid_count
if valid_count > 1:
variance /= valid_count - 1
std_dev = math.sqrt(max(variance, 0))
rval.append( { "mean": mean, "max": results.max_val[i], "min": results.min_val[i], \
"coverage": coverage, "std_dev": std_dev } )
return rval
cdef _get_chrom_id_and_size( self, char * chrom ):
"""
Lookup id and size from the chromosome named `chrom`
"""
bytes = self.chrom_bpt.find( chrom )
if bytes is not None:
# The value is two 32 bit uints, use the BPT's reader for checking byteswapping
assert len( bytes ) == 8
chrom_id, chrom_size = self.chrom_bpt.reader.unpack( "II", bytes )
return chrom_id, chrom_size
else:
return None, None
cdef _summarize_from_full( self, bits32 chrom_id, bits32 start, bits32 end, int summary_size ):
"""
Create summary from full data. This is data specific so must be overridden.
"""
pass
cdef _best_zoom_level( self, int desired_reduction ):
if desired_reduction <= 1:
return None
cdef ZoomLevel level, closest_level
cdef int diff, closest_diff = limits.INT_MAX
closest_level = None
for level in self.level_list:
diff = desired_reduction - level.reduction_level
if diff >= 0 and diff < closest_diff:
closest_diff = diff
closest_level = level
return closest_level
cdef class ZoomLevel:
cdef BBIFile bbi_file
cdef public bits32 reduction_level
cdef bits32 reserved
cdef public bits64 data_offset
cdef public bits64 index_offset
cdef int item_count
def _summary_blocks_in_region( self, bits32 chrom_id, bits32 start, bits32 end ):
"""
Return a list of all SummaryBlocks that overlap the region
`chrom_id`:`start`-`end`
"""
cdef CIRTreeFile ctf
cdef SummaryBlock summary
rval = deque()
reader = self.bbi_file.reader
reader.seek( self.index_offset )
ctf = CIRTreeFile( reader.file )
block_list = ctf.find_overlapping_blocks( chrom_id, start, end )
for offset, size in block_list:
# Seek to and read all data for the block
reader.seek( offset )
block_data = reader.read( size )
# Might need to uncompress
if self.bbi_file.uncompress_buf_size > 0:
## block_data = zlib.decompress( block_data, buf_size = self.bbi_file.uncompress_buf_size )
block_data = zlib.decompress( block_data )
block_size = len( block_data )
# The block should be a bunch of summaries.
assert block_size % summary_on_disk_size == 0
item_count = block_size / summary_on_disk_size
# Create another reader just for the block, shouldn't be too expensive
block_reader = BinaryFileReader( BytesIO( block_data ), is_little_endian=reader.is_little_endian )
for i from 0 <= i < item_count:
## NOTE: Look carefully at bbiRead again to be sure the endian
## conversion here is all correct. It looks like it is
## just pushing raw data into memory and not swapping
sum_chrom_id = block_reader.read_uint32()
# A block can contain summaries from more that one chrom_id
if sum_chrom_id != chrom_id:
block_reader.skip(7*4)
continue
summary = SummaryBlock()
summary.chrom_id = sum_chrom_id
summary.start = block_reader.read_uint32()
summary.end = block_reader.read_uint32()
summary.valid_count = block_reader.read_uint32()
summary.min_val = block_reader.read_float()
summary.max_val = block_reader.read_float()
summary.sum_data = block_reader.read_float()
summary.sum_squares = block_reader.read_float()
rval.append( summary )
return rval
cdef _get_summary_slice( self, bits32 base_start, bits32 base_end, summaries ):
cdef float valid_count = 0.0
cdef float sum_data = 0.0
cdef float sum_squares = 0.0
cdef float min_val = numpy.nan
cdef float max_val = numpy.nan
cdef float overlap_factor
cdef int overlap
if summaries:
min_val = summaries[0].min_val
max_val = summaries[0].max_val
for summary in summaries:
if summary.start >= base_end:
break
overlap = range_intersection( base_start, base_end, summary.start, summary.end )
if overlap > 0:
overlap_factor = <double> overlap / (summary.end - summary.start)
valid_count += summary.valid_count * overlap_factor
sum_data += summary.sum_data * overlap_factor
sum_squares += summary.sum_squares * overlap_factor
if max_val < summary.max_val:
max_val = summary.max_val
if min_val > summary.min_val:
min_val = summary.min_val
return valid_count, sum_data, sum_squares, min_val, max_val
cdef _summarize( self, bits32 chrom_id, bits32 start, bits32 end, int summary_size ):
"""
Summarize directly from file.
Looking at Jim's code, it appears that
- bbiSummariesInRegion returns all summaries that span start-end in
sorted order
- bbiSummarySlice is then used to aggregate over the subset of those
summaries that overlap a single summary element
"""
cdef bits32 base_start, base_end, base_step
# We locally cdef the arrays so all indexing will be at C speeds
cdef numpy.ndarray[numpy.float64_t] valid_count
cdef numpy.ndarray[numpy.float64_t] min_val
cdef numpy.ndarray[numpy.float64_t] max_val
cdef numpy.ndarray[numpy.float64_t] sum_data
cdef numpy.ndarray[numpy.float64_t] sum_squares
# What we will load into
rval = SummarizedData( start, end, summary_size )
valid_count = rval.valid_count
min_val = rval.min_val
max_val = rval.max_val
sum_data = rval.sum_data
sum_squares = rval.sum_squares
# First, load up summaries
reader = self.bbi_file.reader
reader.seek( self.index_offset )
summaries = self._summary_blocks_in_region(chrom_id, start, end)
base_step = (end - start) // summary_size
base_start = start
base_end = start
for i in range(summary_size):
base_end += base_step
while summaries and summaries[0].end <= base_start:
summaries.popleft()
valid_count[i], sum_data[i], sum_squares[i], min_val[i], max_val[i] = self._get_summary_slice(base_start, base_end, summaries)
base_start = base_end
return rval
|