1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
# encoding: utf-8
from __future__ import absolute_import, division, print_function
from struct import Struct
from .exceptions import UnexpectedEndOfFileError
BIG_ENDIAN = '>'
LITTLE_ENDIAN = '<'
class StreamReader(object):
"""
Wraps a file-like object to provide access to structured data from a
binary file. Byte-order is configurable. *base_offset* is added to any
base value provided to calculate actual location for reads.
"""
def __init__(self, stream, byte_order, base_offset=0):
super(StreamReader, self).__init__()
self._stream = stream
self._byte_order = (
LITTLE_ENDIAN if byte_order == LITTLE_ENDIAN else BIG_ENDIAN
)
self._base_offset = base_offset
def read(self, count):
"""
Allow pass-through read() call
"""
return self._stream.read(count)
def read_byte(self, base, offset=0):
"""
Return the int value of the byte at the file position defined by
self._base_offset + *base* + *offset*. If *base* is None, the byte is
read from the current position in the stream.
"""
fmt = 'B'
return self._read_int(fmt, base, offset)
def read_long(self, base, offset=0):
"""
Return the int value of the four bytes at the file position defined by
self._base_offset + *base* + *offset*. If *base* is None, the long is
read from the current position in the stream. The endian setting of
this instance is used to interpret the byte layout of the long.
"""
fmt = '<L' if self._byte_order is LITTLE_ENDIAN else '>L'
return self._read_int(fmt, base, offset)
def read_short(self, base, offset=0):
"""
Return the int value of the two bytes at the file position determined
by *base* and *offset*, similarly to ``read_long()`` above.
"""
fmt = b'<H' if self._byte_order is LITTLE_ENDIAN else b'>H'
return self._read_int(fmt, base, offset)
def read_str(self, char_count, base, offset=0):
"""
Return a string containing the *char_count* bytes at the file
position determined by self._base_offset + *base* + *offset*.
"""
def str_struct(char_count):
format_ = '%ds' % char_count
return Struct(format_)
struct = str_struct(char_count)
chars = self._unpack_item(struct, base, offset)
unicode_str = chars.decode('UTF-8')
return unicode_str
def seek(self, base, offset=0):
location = self._base_offset + base + offset
self._stream.seek(location)
def tell(self):
"""
Allow pass-through tell() call
"""
return self._stream.tell()
def _read_bytes(self, byte_count, base, offset):
self.seek(base, offset)
bytes_ = self._stream.read(byte_count)
if len(bytes_) < byte_count:
raise UnexpectedEndOfFileError
return bytes_
def _read_int(self, fmt, base, offset):
struct = Struct(fmt)
return self._unpack_item(struct, base, offset)
def _unpack_item(self, struct, base, offset):
bytes_ = self._read_bytes(struct.size, base, offset)
return struct.unpack(bytes_)[0]
|