1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
from __future__ import absolute_import
import abc
class ABCRecord(object):
__metaclass__ = abc.ABCMeta
__slots__ = ()
@abc.abstractproperty
def offset(self):
""" Absolute offset of record
"""
@abc.abstractproperty
def timestamp(self):
""" Epoch milliseconds
"""
@abc.abstractproperty
def timestamp_type(self):
""" CREATE_TIME(0) or APPEND_TIME(1)
"""
@abc.abstractproperty
def key(self):
""" Bytes key or None
"""
@abc.abstractproperty
def value(self):
""" Bytes value or None
"""
@abc.abstractproperty
def checksum(self):
""" Prior to v2 format CRC was contained in every message. This will
be the checksum for v0 and v1 and None for v2 and above.
"""
@abc.abstractproperty
def headers(self):
""" If supported by version list of key-value tuples, or empty list if
not supported by format.
"""
class ABCRecordBatchBuilder(object):
__metaclass__ = abc.ABCMeta
__slots__ = ()
@abc.abstractmethod
def append(self, offset, timestamp, key, value, headers=None):
""" Writes record to internal buffer.
Arguments:
offset (int): Relative offset of record, starting from 0
timestamp (int or None): Timestamp in milliseconds since beginning
of the epoch (midnight Jan 1, 1970 (UTC)). If omitted, will be
set to current time.
key (bytes or None): Key of the record
value (bytes or None): Value of the record
headers (List[Tuple[str, bytes]]): Headers of the record. Header
keys can not be ``None``.
Returns:
(bytes, int): Checksum of the written record (or None for v2 and
above) and size of the written record.
"""
@abc.abstractmethod
def size_in_bytes(self, offset, timestamp, key, value, headers):
""" Return the expected size change on buffer (uncompressed) if we add
this message. This will account for varint size changes and give a
reliable size.
"""
@abc.abstractmethod
def build(self):
""" Close for append, compress if needed, write size and header and
return a ready to send buffer object.
Return:
bytearray: finished batch, ready to send.
"""
class ABCRecordBatch(object):
""" For v2 incapsulates a RecordBatch, for v0/v1 a single (maybe
compressed) message.
"""
__metaclass__ = abc.ABCMeta
__slots__ = ()
@abc.abstractmethod
def __iter__(self):
""" Return iterator over records (ABCRecord instances). Will decompress
if needed.
"""
class ABCRecords(object):
__metaclass__ = abc.ABCMeta
__slots__ = ()
@abc.abstractmethod
def __init__(self, buffer):
""" Initialize with bytes-like object conforming to the buffer
interface (ie. bytes, bytearray, memoryview etc.).
"""
@abc.abstractmethod
def size_in_bytes(self):
""" Returns the size of inner buffer.
"""
@abc.abstractmethod
def next_batch(self):
""" Return next batch of records (ABCRecordBatch instances).
"""
@abc.abstractmethod
def has_next(self):
""" True if there are more batches to read, False otherwise.
"""
|