1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
"""
Semi-random access to bz2 compressed data.
"""
import bisect
from ._seekbzip2 import SeekBzip2
class SeekableBzip2File:
"""
Filelike object supporting read-only semi-random access to bz2 compressed
files for which an offset table (bz2t) has been generated by `bzip-table`.
"""
def __init__(self, filename, table_filename, **kwargs):
self.filename = filename
self.table_filename = table_filename
self.init_table()
self.init_bz2()
self.pos = 0
self.dirty = True
self.closed = False
def init_bz2(self):
self.seek_bz2 = SeekBzip2(self.filename)
def init_table(self):
# Position in plaintext file
self.table_positions = []
# Position of corresponding block in bz2 file (bits)
self.table_bz2positions = []
pos = 0
for line in open(self.table_filename):
fields = line.split()
# Position of the compressed block in the bz2 file
bz2_pos = int(fields[0])
# print >> sys.stderr, fields[0], bz2_pos
# Length of the block when uncompressed
length = int(fields[1])
self.table_positions.append(pos)
self.table_bz2positions.append(bz2_pos)
old_pos = pos
pos = pos + length
assert pos > old_pos
self.size = pos
def close(self):
self.seek_bz2.close()
self.closed = True
def fix_dirty(self):
# Our virtual position in the uncompressed data is out of sync
# FIXME: If we're moving to a later position that is still in
# the same block, we could just read and throw out bytes in the
# compressed stream, less wasteful then backtracking
chunk, offset = self.get_chunk_and_offset(self.pos)
# Get the seek position for that chunk and seek to it
bz2_seek_pos = self.table_bz2positions[chunk]
# print >>sys.stderr, "bz2 seek pos:", bz2_seek_pos
self.seek_bz2.seek(bz2_seek_pos)
# Consume bytes to move to the correct position
assert len(self.seek_bz2.read(offset)) == offset
# Update state
self.dirty = False
def read(self, sizehint=-1):
if sizehint < 0:
chunks = []
while True:
val = self._read(1024 * 1024)
if val:
chunks.append(val)
else:
break
return b"".join(chunks)
else:
return self._read(sizehint)
def _read(self, size):
if self.dirty:
self.fix_dirty()
val = self.seek_bz2.read(size)
if val is None:
# EOF
self.pos = self.size
val = b""
else:
self.pos = self.pos + len(val)
return val
def readline(self, size=-1):
if self.dirty:
self.fix_dirty()
val = self.seek_bz2.readline(size)
if val is None:
# EOF
self.pos = self.size
val = b""
else:
self.pos = self.pos + len(val)
return val
def tell(self):
return self.pos
def get_chunk_and_offset(self, position):
# Find the chunk that position is in using a binary search
chunk = bisect.bisect(self.table_positions, position) - 1
offset = position - self.table_positions[chunk]
return chunk, offset
def seek(self, offset, whence=0):
# Determine absolute target position
if whence == 0:
target_pos = offset
elif whence == 1:
target_pos = self.pos + offset
elif whence == 2:
target_pos = self.size - offset
else:
raise Exception("Invalid `whence` argument: %r", whence)
# Check if this is a noop
if target_pos == self.pos:
return
# Verify it is valid
assert 0 <= target_pos < self.size, "Attempt to seek outside file"
# Move the position
self.pos = target_pos
# Mark as dirty, the next time a read is done we need to actually
# move the position in the bzip2 file
self.dirty = True
# ---- File like methods ------------------------------------------------
def __next__(self):
ln = self.readline()
if ln == b"":
raise StopIteration()
return ln
def __iter__(self):
return self
def flush(self):
pass
def readable(self):
return True
def readlines(self, sizehint=-1):
return list(self)
def seekable(self):
return True
def xreadlines(self):
return iter(self)
def writable(self):
return False
|