1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
# This class takes advantage of the fact that all formats v0, v1 and v2 of
# messages storage has the same byte offsets for Length and Magic fields.
# Lets look closely at what leading bytes all versions have:
#
# V0 and V1 (Offset is MessageSet part, other bytes are Message ones):
# Offset => Int64
# BytesLength => Int32
# CRC => Int32
# Magic => Int8
# ...
#
# V2:
# BaseOffset => Int64
# Length => Int32
# PartitionLeaderEpoch => Int32
# Magic => Int8
# ...
#
# So we can iterate over batches just by knowing offsets of Length. Magic is
# used to construct the correct class for Batch itself.
from __future__ import division
import struct
from kafka.errors import CorruptRecordException
from kafka.record.abc import ABCRecords
from kafka.record.legacy_records import LegacyRecordBatch, LegacyRecordBatchBuilder
from kafka.record.default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
class MemoryRecords(ABCRecords):
LENGTH_OFFSET = struct.calcsize(">q")
LOG_OVERHEAD = struct.calcsize(">qi")
MAGIC_OFFSET = struct.calcsize(">qii")
# Minimum space requirements for Record V0
MIN_SLICE = LOG_OVERHEAD + LegacyRecordBatch.RECORD_OVERHEAD_V0
__slots__ = ("_buffer", "_pos", "_next_slice", "_remaining_bytes")
def __init__(self, bytes_data):
self._buffer = bytes_data
self._pos = 0
# We keep one slice ahead so `has_next` will return very fast
self._next_slice = None
self._remaining_bytes = None
self._cache_next()
def size_in_bytes(self):
return len(self._buffer)
def valid_bytes(self):
# We need to read the whole buffer to get the valid_bytes.
# NOTE: in Fetcher we do the call after iteration, so should be fast
if self._remaining_bytes is None:
next_slice = self._next_slice
pos = self._pos
while self._remaining_bytes is None:
self._cache_next()
# Reset previous iterator position
self._next_slice = next_slice
self._pos = pos
return len(self._buffer) - self._remaining_bytes
# NOTE: we cache offsets here as kwargs for a bit more speed, as cPython
# will use LOAD_FAST opcode in this case
def _cache_next(self, len_offset=LENGTH_OFFSET, log_overhead=LOG_OVERHEAD):
buffer = self._buffer
buffer_len = len(buffer)
pos = self._pos
remaining = buffer_len - pos
if remaining < log_overhead:
# Will be re-checked in Fetcher for remaining bytes.
self._remaining_bytes = remaining
self._next_slice = None
return
length, = struct.unpack_from(
">i", buffer, pos + len_offset)
slice_end = pos + log_overhead + length
if slice_end > buffer_len:
# Will be re-checked in Fetcher for remaining bytes
self._remaining_bytes = remaining
self._next_slice = None
return
self._next_slice = memoryview(buffer)[pos: slice_end]
self._pos = slice_end
def has_next(self):
return self._next_slice is not None
# NOTE: same cache for LOAD_FAST as above
def next_batch(self, _min_slice=MIN_SLICE,
_magic_offset=MAGIC_OFFSET):
next_slice = self._next_slice
if next_slice is None:
return None
if len(next_slice) < _min_slice:
raise CorruptRecordException(
"Record size is less than the minimum record overhead "
"({})".format(_min_slice - self.LOG_OVERHEAD))
self._cache_next()
magic, = struct.unpack_from(">b", next_slice, _magic_offset)
if magic <= 1:
return LegacyRecordBatch(next_slice, magic)
else:
return DefaultRecordBatch(next_slice)
class MemoryRecordsBuilder(object):
__slots__ = ("_builder", "_batch_size", "_buffer", "_next_offset", "_closed",
"_bytes_written")
def __init__(self, magic, compression_type, batch_size):
assert magic in [0, 1, 2], "Not supported magic"
assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type"
if magic >= 2:
self._builder = DefaultRecordBatchBuilder(
magic=magic, compression_type=compression_type,
is_transactional=False, producer_id=-1, producer_epoch=-1,
base_sequence=-1, batch_size=batch_size)
else:
self._builder = LegacyRecordBatchBuilder(
magic=magic, compression_type=compression_type,
batch_size=batch_size)
self._batch_size = batch_size
self._buffer = None
self._next_offset = 0
self._closed = False
self._bytes_written = 0
def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
Returns: RecordMetadata or None if unable to append
"""
if self._closed:
return None
offset = self._next_offset
metadata = self._builder.append(offset, timestamp, key, value, headers)
# Return of None means there's no space to add a new message
if metadata is None:
return None
self._next_offset += 1
return metadata
def close(self):
# This method may be called multiple times on the same batch
# i.e., on retries
# we need to make sure we only close it out once
# otherwise compressed messages may be double-compressed
# see Issue 718
if not self._closed:
self._bytes_written = self._builder.size()
self._buffer = bytes(self._builder.build())
self._builder = None
self._closed = True
def size_in_bytes(self):
if not self._closed:
return self._builder.size()
else:
return len(self._buffer)
def compression_rate(self):
assert self._closed
return self.size_in_bytes() / self._bytes_written
def is_full(self):
if self._closed:
return True
else:
return self._builder.size() >= self._batch_size
def next_offset(self):
return self._next_offset
def buffer(self):
assert self._closed
return self._buffer
|