1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""BufferWorkSpace module."""
import os
new_lines = ["\r\n", "\n", "\r"]
new_lines_bytes = [n.encode("ascii") for n in new_lines] # we only support encodings that's backward compat with ascii
class BufferWorkSpace:
"""It is a helper module for FileReadBackwards."""
def __init__(self, fp, chunk_size):
"""Convention for the data.
When read_buffer is not None, it represents contents of the file from `read_position` onwards
that has not been processed/returned.
read_position represents the file pointer position that has been read into read_buffer
initialized to be just past the end of file.
"""
self.fp = fp
self.read_position = _get_file_size(self.fp) # set the previously read position to the
self.read_buffer = None
self.chunk_size = chunk_size
def add_to_buffer(self, content, read_position):
"""Add additional bytes content as read from the read_position.
Args:
content (bytes): data to be added to buffer working BufferWorkSpac.
read_position (int): where in the file pointer the data was read from.
"""
self.read_position = read_position
if self.read_buffer is None:
self.read_buffer = content
else:
self.read_buffer = content + self.read_buffer
def yieldable(self):
"""Return True if there is a line that the buffer can return, False otherwise."""
if self.read_buffer is None:
return False
t = _remove_trailing_new_line(self.read_buffer)
n = _find_furthest_new_line(t)
if n >= 0:
return True
# we have read in entire file and have some unprocessed lines
if self.read_position == 0 and self.read_buffer is not None:
return True
return False
def return_line(self):
"""Return a new line if it is available.
Precondition: self.yieldable() must be True
"""
assert(self.yieldable()) # noqa: E275
t = _remove_trailing_new_line(self.read_buffer)
i = _find_furthest_new_line(t)
if i >= 0:
delimiter = i + 1
after_new_line = slice(delimiter, None)
up_to_include_new_line = slice(0, delimiter)
r = t[after_new_line]
self.read_buffer = t[up_to_include_new_line]
else: # the case where we have read in entire file and at the "last" line
r = t
self.read_buffer = None
return r
def read_until_yieldable(self):
"""Read in additional chunks until it is yieldable."""
while not self.yieldable():
read_content, read_position = _get_next_chunk(self.fp, self.read_position, self.chunk_size)
self.add_to_buffer(read_content, read_position)
def has_returned_every_line(self):
"""Return True if every single line in the file has been returned, False otherwise."""
if self.read_position == 0 and self.read_buffer is None:
return True
return False
def _get_file_size(fp):
return os.fstat(fp.fileno()).st_size
def _get_next_chunk(fp, previously_read_position, chunk_size):
"""Return next chunk of data that we would from the file pointer.
Args:
fp: file-like object
previously_read_position: file pointer position that we have read from
chunk_size: desired read chunk_size
Returns:
(bytestring, int): data that has been read in, the file pointer position where the data has been read from
"""
seek_position, read_size = _get_what_to_read_next(fp, previously_read_position, chunk_size)
fp.seek(seek_position)
read_content = fp.read(read_size)
read_position = seek_position
return read_content, read_position
def _get_what_to_read_next(fp, previously_read_position, chunk_size):
"""Return information on which file pointer position to read from and how many bytes.
Args:
fp
past_read_positon (int): The file pointer position that has been read previously
chunk_size(int): ideal io chunk_size
Returns:
(int, int): The next seek position, how many bytes to read next
"""
seek_position = max(previously_read_position - chunk_size, 0)
read_size = chunk_size
# examples: say, our new_lines are potentially "\r\n", "\n", "\r"
# find a reading point where it is not "\n", rewind further if necessary
# if we have "\r\n" and we read in "\n",
# the next iteration would treat "\r" as a different new line.
# Q: why don't I just check if it is b"\n", but use a function ?
# A: so that we can potentially expand this into generic sets of separators, later on.
while seek_position > 0:
fp.seek(seek_position)
if _is_partially_read_new_line(fp.read(1)):
seek_position -= 1
read_size += 1 # as we rewind further, let's make sure we read more to compensate
else:
break
# take care of special case when we are back to the beginnin of the file
read_size = min(previously_read_position - seek_position, read_size)
return seek_position, read_size
def _remove_trailing_new_line(line):
"""Remove a single instance of new line at the end of line if it exists.
Returns:
bytestring
"""
# replace only 1 instance of newline
# match longest line first (hence the reverse=True), we want to match "\r\n" rather than "\n" if we can
for n in sorted(new_lines_bytes, key=lambda x: len(x), reverse=True):
if line.endswith(n):
remove_new_line = slice(None, -len(n))
return line[remove_new_line]
return line
def _find_furthest_new_line(read_buffer):
"""Return -1 if read_buffer does not contain new line otherwise the position of the rightmost newline.
Args:
read_buffer (bytestring)
Returns:
int: The right most position of new line character in read_buffer if found, else -1
"""
new_line_positions = [read_buffer.rfind(n) for n in new_lines_bytes]
return max(new_line_positions)
def _is_partially_read_new_line(b):
"""Return True when b is part of a new line separator found at index >= 1, False otherwise.
Args:
b (bytestring)
Returns:
bool
"""
for n in new_lines_bytes:
if n.find(b) >= 1:
return True
return False
|