1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
|
# -*- coding: utf-8 -*-
import collections
import logging
import time
from mock import MagicMock, patch
from . import unittest
from kafka import KafkaClient, SimpleProducer, KeyedProducer
from kafka.common import (
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
ProduceResponse, RetryOptions, TopicAndPartition
)
from kafka.producer.base import Producer, _send_upstream
from kafka.protocol import CODEC_NONE
import threading
try:
from queue import Empty, Queue
except ImportError:
from Queue import Empty, Queue
try:
xrange
except NameError:
xrange = range
class TestKafkaProducer(unittest.TestCase):
def test_producer_message_types(self):
producer = Producer(MagicMock())
topic = b"test-topic"
partition = 0
bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
('a', 'tuple'), {'a': 'dict'}, None,)
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, partition, m)
good_data_types = (b'a string!',)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, partition, m)
def test_keyedproducer_message_types(self):
client = MagicMock()
client.get_partition_ids_for_topic.return_value = [0, 1]
producer = KeyedProducer(client)
topic = b"test-topic"
key = b"testkey"
bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
('a', 'tuple'), {'a': 'dict'},)
for m in bad_data_types:
with self.assertRaises(TypeError):
logging.debug("attempting to send message of type %s", type(m))
producer.send_messages(topic, key, m)
good_data_types = (b'a string!', None,)
for m in good_data_types:
# This should not raise an exception
producer.send_messages(topic, key, m)
def test_topic_message_types(self):
client = MagicMock()
def partitions(topic):
return [0, 1]
client.get_partition_ids_for_topic = partitions
producer = SimpleProducer(client, random_start=False)
topic = b"test-topic"
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
@patch('kafka.producer.base._send_upstream')
def test_producer_async_queue_overfilled(self, mock):
queue_size = 2
producer = Producer(MagicMock(), async=True,
async_queue_maxsize=queue_size)
topic = b'test-topic'
partition = 0
message = b'test-message'
with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
self.assertEqual(producer.queue.qsize(), queue_size)
for _ in xrange(producer.queue.qsize()):
producer.queue.get()
def test_producer_sync_fail_on_error(self):
error = FailedPayloadsError('failure')
with patch.object(KafkaClient, 'load_metadata_for_topics'):
with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
client = KafkaClient(MagicMock())
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
# This should not raise
(response,) = producer.send_messages('foobar', b'test message')
self.assertEqual(response, error)
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
with self.assertRaises(FailedPayloadsError):
producer.send_messages('foobar', b'test message')
def test_cleanup_is_not_called_on_stopped_producer(self):
producer = Producer(MagicMock(), async=True)
producer.stopped = True
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
self.assertEqual(mocked_stop.call_count, 0)
def test_cleanup_is_called_on_running_producer(self):
producer = Producer(MagicMock(), async=True)
producer.stopped = False
with patch.object(producer, 'stop') as mocked_stop:
producer._cleanup_func(producer)
self.assertEqual(mocked_stop.call_count, 1)
class TestKafkaProducerSendUpstream(unittest.TestCase):
def setUp(self):
self.client = MagicMock()
self.queue = Queue()
def _run_process(self, retries_limit=3, sleep_timeout=1):
# run _send_upstream process with the queue
stop_event = threading.Event()
retry_options = RetryOptions(limit=retries_limit,
backoff_ms=50,
retry_on_timeouts=False)
self.thread = threading.Thread(
target=_send_upstream,
args=(self.queue, self.client, CODEC_NONE,
0.3, # batch time (seconds)
3, # batch length
Producer.ACK_AFTER_LOCAL_WRITE,
Producer.DEFAULT_ACK_TIMEOUT,
retry_options,
stop_event))
self.thread.daemon = True
self.thread.start()
time.sleep(sleep_timeout)
stop_event.set()
def test_wo_retries(self):
# lets create a queue and add 10 messages for 1 partition
for i in range(10):
self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i"))
self._run_process()
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
# there should be 4 non-void cals:
# 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 4)
def test_first_send_failed(self):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [FailedPayloadsError(req) for req in reqs]
responses = []
for req in reqs:
offset = offsets[req.topic][req.partition]
offsets[req.topic][req.partition] += len(req.messages)
responses.append(
ProduceResponse(req.topic, req.partition, 0, offset)
)
return responses
self.client.send_produce_request.side_effect = send_side_effect
self._run_process(2)
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
# there should be 5 non-void calls: 1st failed batch of 3 msgs
# plus 3 batches of 3 msgs each + 1 batch of 1 message
self.assertEqual(self.client.send_produce_request.call_count, 5)
def test_with_limited_retries(self):
# lets create a queue and add 10 messages for 10 different partitions
# to show how retries should work ideally
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
def send_side_effect(reqs, *args, **kwargs):
return [FailedPayloadsError(req) for req in reqs]
self.client.send_produce_request.side_effect = send_side_effect
self._run_process(3, 3)
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
# there should be 16 non-void calls:
# 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
# 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
self.assertEqual(self.client.send_produce_request.call_count, 16)
def test_async_producer_not_leader(self):
for i in range(10):
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
# Mock offsets counter for closure
offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
self.client.is_first_time = True
def send_side_effect(reqs, *args, **kwargs):
if self.client.is_first_time:
self.client.is_first_time = False
return [ProduceResponse(req.topic, req.partition,
NotLeaderForPartitionError.errno, -1)
for req in reqs]
responses = []
for req in reqs:
offset = offsets[req.topic][req.partition]
offsets[req.topic][req.partition] += len(req.messages)
responses.append(
ProduceResponse(req.topic, req.partition, 0, offset)
)
return responses
self.client.send_produce_request.side_effect = send_side_effect
self._run_process(2)
# the queue should be void at the end of the test
self.assertEqual(self.queue.empty(), True)
# there should be 5 non-void calls: 1st failed batch of 3 msgs
# + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
self.assertEqual(self.client.send_produce_request.call_count, 5)
def tearDown(self):
for _ in xrange(self.queue.qsize()):
self.queue.get()
|