1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
import sys
__all__ = ["ReceiveBuffer"]
# Operations we want to support:
# - find next \r\n or \r\n\r\n, or wait until there is one
# - read at-most-N bytes
# Goals:
# - on average, do this fast
# - worst case, do this in O(n) where n is the number of bytes processed
# Plan:
# - store bytearray, offset, how far we've searched for a separator token
# - use the how-far-we've-searched data to avoid rescanning
# - while doing a stream of uninterrupted processing, advance offset instead
# of constantly copying
# WARNING:
# - I haven't benchmarked or profiled any of this yet.
#
# Note that starting in Python 3.4, deleting the initial n bytes from a
# bytearray is amortized O(n), thanks to some excellent work by Antoine
# Martin:
#
# https://bugs.python.org/issue19087
#
# This means that if we only supported 3.4+, we could get rid of the code here
# involving self._start and self.compress, because it's doing exactly the same
# thing that bytearray now does internally.
#
# BUT unfortunately, we still support 2.7, and reading short segments out of a
# long buffer MUST be O(bytes read) to avoid DoS issues, so we can't actually
# delete this code. Yet:
#
# https://pythonclock.org/
#
# (Two things to double-check first though: make sure PyPy also has the
# optimization, and benchmark to make sure it's a win, since we do have a
# slightly clever thing where we delay calling compress() until we've
# processed a whole event, which could in theory be slightly more efficient
# than the internal bytearray support.)
class ReceiveBuffer(object):
def __init__(self):
self._data = bytearray()
# These are both absolute offsets into self._data:
self._start = 0
self._looked_at = 0
self._looked_for = b""
def __bool__(self):
return bool(len(self))
# for @property unprocessed_data
def __bytes__(self):
return bytes(self._data[self._start:])
if sys.version_info[0] < 3: # version specific: Python 2
__str__ = __bytes__
__nonzero__ = __bool__
def __len__(self):
return len(self._data) - self._start
def compress(self):
# Heuristic: only compress if it lets us reduce size by a factor
# of 2
if self._start > len(self._data) // 2:
del self._data[:self._start]
self._looked_at -= self._start
self._start -= self._start
def __iadd__(self, byteslike):
self._data += byteslike
return self
def maybe_extract_at_most(self, count):
out = self._data[self._start:self._start + count]
if not out:
return None
self._start += len(out)
return out
def maybe_extract_until_next(self, needle):
# Returns extracted bytes on success (advancing offset), or None on
# failure
if self._looked_for == needle:
search_start = max(self._start, self._looked_at - len(needle) + 1)
else:
search_start = self._start
offset = self._data.find(needle, search_start)
if offset == -1:
self._looked_at = len(self._data)
self._looked_for = needle
return None
new_start = offset + len(needle)
out = self._data[self._start:new_start]
self._start = new_start
return out
# HTTP/1.1 has a number of constructs where you keep reading lines until
# you see a blank one. This does that, and then returns the lines.
def maybe_extract_lines(self):
if self._data[self._start:self._start + 2] == b"\r\n":
self._start += 2
return []
else:
data = self.maybe_extract_until_next(b"\r\n\r\n")
if data is None:
return None
lines = data.split(b"\r\n")
assert lines[-2] == lines[-1] == b""
del lines[-2:]
return lines
|