1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
from struct import Struct
from .varint import read_varint
def read_binary_str(buf):
length = read_varint(buf)
return read_binary_str_fixed_len(buf, length)
def read_binary_bytes(buf):
length = read_varint(buf)
return read_binary_bytes_fixed_len(buf, length)
def read_binary_str_fixed_len(buf, length):
return read_binary_bytes_fixed_len(buf, length).decode('utf-8')
def read_binary_bytes_fixed_len(buf, length):
return buf.read(length)
def read_binary_int(buf, fmt):
"""
Reads int from buffer with provided format.
"""
# Little endian.
s = Struct('<' + fmt)
return s.unpack(buf.read(s.size))[0]
def read_binary_int8(buf):
return read_binary_int(buf, 'b')
def read_binary_int16(buf):
return read_binary_int(buf, 'h')
def read_binary_int32(buf):
return read_binary_int(buf, 'i')
def read_binary_int64(buf):
return read_binary_int(buf, 'q')
def read_binary_uint8(buf):
return read_binary_int(buf, 'B')
def read_binary_uint16(buf):
return read_binary_int(buf, 'H')
def read_binary_uint32(buf):
return read_binary_int(buf, 'I')
def read_binary_uint64(buf):
return read_binary_int(buf, 'Q')
def read_binary_uint128(buf):
hi = read_binary_int(buf, 'Q')
lo = read_binary_int(buf, 'Q')
return (hi << 64) + lo
|