1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
from __future__ import absolute_import
import struct
from struct import error
from kafka.protocol.abstract import AbstractType
def _pack(f, value):
try:
return f(value)
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
"{!r} to struct format: '{}', hit error: {}"
.format(value, f, e))
def _unpack(f, data):
try:
(value,) = f(data)
return value
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
"{!r} to struct format: '{}', hit error: {}"
.format(data, f, e))
class Int8(AbstractType):
_pack = struct.Struct('>b').pack
_unpack = struct.Struct('>b').unpack
@classmethod
def encode(cls, value):
return _pack(cls._pack, value)
@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(1))
class Int16(AbstractType):
_pack = struct.Struct('>h').pack
_unpack = struct.Struct('>h').unpack
@classmethod
def encode(cls, value):
return _pack(cls._pack, value)
@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(2))
class Int32(AbstractType):
_pack = struct.Struct('>i').pack
_unpack = struct.Struct('>i').unpack
@classmethod
def encode(cls, value):
return _pack(cls._pack, value)
@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(4))
class Int64(AbstractType):
_pack = struct.Struct('>q').pack
_unpack = struct.Struct('>q').unpack
@classmethod
def encode(cls, value):
return _pack(cls._pack, value)
@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(8))
class String(AbstractType):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
def encode(self, value):
if value is None:
return Int16.encode(-1)
value = str(value).encode(self.encoding)
return Int16.encode(len(value)) + value
def decode(self, data):
length = Int16.decode(data)
if length < 0:
return None
value = data.read(length)
if len(value) != length:
raise ValueError('Buffer underrun decoding string')
return value.decode(self.encoding)
class Bytes(AbstractType):
@classmethod
def encode(cls, value):
if value is None:
return Int32.encode(-1)
else:
return Int32.encode(len(value)) + value
@classmethod
def decode(cls, data):
length = Int32.decode(data)
if length < 0:
return None
value = data.read(length)
if len(value) != length:
raise ValueError('Buffer underrun decoding Bytes')
return value
@classmethod
def repr(cls, value):
return repr(value[:100] + b'...' if value is not None and len(value) > 100 else value)
class Boolean(AbstractType):
_pack = struct.Struct('>?').pack
_unpack = struct.Struct('>?').unpack
@classmethod
def encode(cls, value):
return _pack(cls._pack, value)
@classmethod
def decode(cls, data):
return _unpack(cls._unpack, data.read(1))
class Schema(AbstractType):
def __init__(self, *fields):
if fields:
self.names, self.fields = zip(*fields)
else:
self.names, self.fields = (), ()
def encode(self, item):
if len(item) != len(self.fields):
raise ValueError('Item field count does not match Schema')
return b''.join([
field.encode(item[i])
for i, field in enumerate(self.fields)
])
def decode(self, data):
return tuple([field.decode(data) for field in self.fields])
def __len__(self):
return len(self.fields)
def repr(self, value):
key_vals = []
try:
for i in range(len(self)):
try:
field_val = getattr(value, self.names[i])
except AttributeError:
field_val = value[i]
key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val)))
return '(' + ', '.join(key_vals) + ')'
except Exception:
return repr(value)
class Array(AbstractType):
def __init__(self, *array_of):
if len(array_of) > 1:
self.array_of = Schema(*array_of)
elif len(array_of) == 1 and (isinstance(array_of[0], AbstractType) or
issubclass(array_of[0], AbstractType)):
self.array_of = array_of[0]
else:
raise ValueError('Array instantiated with no array_of type')
def encode(self, items):
if items is None:
return Int32.encode(-1)
return b''.join(
[Int32.encode(len(items))] +
[self.array_of.encode(item) for item in items]
)
def decode(self, data):
length = Int32.decode(data)
if length == -1:
return None
return [self.array_of.decode(data) for _ in range(length)]
def repr(self, list_of_items):
if list_of_items is None:
return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
|