1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
from io import BytesIO
from bx_extras.lrucache import LRUCache
DEFAULT_CACHE_SIZE = 10
DEFAULT_BLOCK_SIZE = 1024 * 1024 * 2
class FileCache:
"""
Wrapper for a file that cache blocks of data in memory.
**NOTE:** this is currently an incomplete file-like object, it only
supports seek, tell, and readline (plus iteration). Reading bytes is
currently not implemented.
"""
def __init__(self, file, size, cache_size=DEFAULT_CACHE_SIZE, block_size=DEFAULT_BLOCK_SIZE):
"""
Create a new `FileCache` wrapping the file-like object `file` that
has total size `size` and caching blocks of size `block_size`.
"""
self.file = file
self.size = size
self.cache_size = cache_size
self.block_size = block_size
# Setup the cache
self.nblocks = (self.size // self.block_size) + 1
self.cache = LRUCache(self.cache_size)
# Position in file
self.dirty = True
self.at_eof = False
self.file_pos = 0
self.current_block_index = -1
self.current_block = None
def fix_dirty(self):
chunk, offset = self.get_block_and_offset(self.file_pos)
if self.current_block_index != chunk:
self.current_block = BytesIO(self.load_block(chunk))
self.current_block.read(offset)
self.current_block_index = chunk
else:
self.current_block.seek(offset)
self.dirty = False
def get_block_and_offset(self, index):
return int(index // self.block_size), int(index % self.block_size)
def load_block(self, index):
if index in self.cache:
return self.cache[index]
else:
real_offset = index * self.block_size
self.file.seek(real_offset)
block = self.file.read(self.block_size)
self.cache[index] = block
return block
def seek(self, offset, whence=0):
"""
Move the file pointer to a particular offset.
"""
# Determine absolute target position
if whence == 0:
target_pos = offset
elif whence == 1:
target_pos = self.file_pos + offset
elif whence == 2:
target_pos = self.size - offset
else:
raise Exception("Invalid `whence` argument: %r", whence)
# Check if this is a noop
if target_pos == self.file_pos:
return
# Verify it is valid
assert 0 <= target_pos < self.size, "Attempt to seek outside file"
# Move the position
self.file_pos = target_pos
# Mark as dirty, the next time a read is done we need to actually
# move the position in the bzip2 file
self.dirty = True
def readline(self):
if self.dirty:
self.fix_dirty()
if self.at_eof:
return b""
rval = []
while True:
line = self.current_block.readline()
rval.append(line)
if len(line) > 0 and line[-1] == b"\n":
break
elif self.current_block_index == self.nblocks - 1:
self.at_eof = True
break
else:
self.current_block_index += 1
self.current_block = BytesIO(self.load_block(self.current_block_index))
return b"".join(rval)
def __next__(self):
line = self.readline()
if line == b"":
raise StopIteration
def __iter__(self):
return self
def close(self):
self.file.close()
|