1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import pytest
from mock import patch
import kafka.codec
from kafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
)
from kafka.errors import UnsupportedCodecError
@pytest.mark.parametrize("compression_type", [
DefaultRecordBatch.CODEC_NONE,
DefaultRecordBatch.CODEC_GZIP,
DefaultRecordBatch.CODEC_SNAPPY,
DefaultRecordBatch.CODEC_LZ4
])
def test_read_write_serde_v2(compression_type):
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=1,
producer_id=123456, producer_epoch=123, base_sequence=9999,
batch_size=999999)
headers = [("header1", b"aaa"), ("header2", b"bbb")]
for offset in range(10):
builder.append(
offset, timestamp=9999999, key=b"test", value=b"Super",
headers=headers)
buffer = builder.build()
reader = DefaultRecordBatch(bytes(buffer))
msgs = list(reader)
assert reader.is_transactional is True
assert reader.compression_type == compression_type
assert reader.magic == 2
assert reader.timestamp_type == 0
assert reader.base_offset == 0
for offset, msg in enumerate(msgs):
assert msg.offset == offset
assert msg.timestamp == 9999999
assert msg.key == b"test"
assert msg.value == b"Super"
assert msg.headers == headers
def test_written_bytes_equals_size_in_bytes_v2():
key = b"test"
value = b"Super"
headers = [("header1", b"aaa"), ("header2", b"bbb"), ("xx", None)]
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=999999)
size_in_bytes = builder.size_in_bytes(
0, timestamp=9999999, key=key, value=value, headers=headers)
pos = builder.size()
meta = builder.append(
0, timestamp=9999999, key=key, value=value, headers=headers)
assert builder.size() - pos == size_in_bytes
assert meta.size == size_in_bytes
def test_estimate_size_in_bytes_bigger_than_batch_v2():
key = b"Super Key"
value = b"1" * 100
headers = [("header1", b"aaa"), ("header2", b"bbb")]
estimate_size = DefaultRecordBatchBuilder.estimate_size_in_bytes(
key, value, headers)
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=999999)
builder.append(
0, timestamp=9999999, key=key, value=value, headers=headers)
buf = builder.build()
assert len(buf) <= estimate_size, \
"Estimate should always be upper bound"
def test_default_batch_builder_validates_arguments():
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=999999)
# Key should not be str
with pytest.raises(TypeError):
builder.append(
0, timestamp=9999999, key="some string", value=None, headers=[])
# Value should not be str
with pytest.raises(TypeError):
builder.append(
0, timestamp=9999999, key=None, value="some string", headers=[])
# Timestamp should be of proper type
with pytest.raises(TypeError):
builder.append(
0, timestamp="1243812793", key=None, value=b"some string",
headers=[])
# Offset of invalid type
with pytest.raises(TypeError):
builder.append(
"0", timestamp=9999999, key=None, value=b"some string", headers=[])
# Ok to pass value as None
builder.append(
0, timestamp=9999999, key=b"123", value=None, headers=[])
# Timestamp can be None
builder.append(
1, timestamp=None, key=None, value=b"some string", headers=[])
# Ok to pass offsets in not incremental order. This should not happen thou
builder.append(
5, timestamp=9999999, key=b"123", value=None, headers=[])
# Check record with headers
builder.append(
6, timestamp=9999999, key=b"234", value=None, headers=[("hkey", b"hval")])
# in case error handling code fails to fix inner buffer in builder
assert len(builder.build()) == 124
def test_default_correct_metadata_response():
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024 * 1024)
meta = builder.append(
0, timestamp=9999999, key=b"test", value=b"Super", headers=[])
assert meta.offset == 0
assert meta.timestamp == 9999999
assert meta.crc is None
assert meta.size == 16
assert repr(meta) == (
"DefaultRecordMetadata(offset=0, size={}, timestamp={})"
.format(meta.size, meta.timestamp)
)
def test_default_batch_size_limit():
# First message can be added even if it's too big
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
meta = builder.append(
0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
assert meta.size > 0
assert meta.crc is None
assert meta.offset == 0
assert meta.timestamp is not None
assert len(builder.build()) > 2000
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
meta = builder.append(
0, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is not None
meta = builder.append(
1, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is None
meta = builder.append(
2, timestamp=None, key=None, value=b"M" * 700, headers=[])
assert meta is None
assert len(builder.build()) < 1000
@pytest.mark.parametrize("compression_type,name,checker_name", [
(DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
(DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
(DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4")
])
@pytest.mark.parametrize("magic", [0, 1])
def test_unavailable_codec(magic, compression_type, name, checker_name):
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
correct_buffer = builder.build()
with patch.object(kafka.codec, checker_name) as mocked:
mocked.return_value = False
# Check that builder raises error
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
error_msg = "Libraries for {} compression codec not found".format(name)
with pytest.raises(UnsupportedCodecError, match=error_msg):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()
# Check that reader raises same error
batch = DefaultRecordBatch(bytes(correct_buffer))
with pytest.raises(UnsupportedCodecError, match=error_msg):
list(batch)
|