1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
|
from __future__ import absolute_import
try:
from itertools import zip_longest as izip_longest, repeat # pylint: disable-msg=E0611
except ImportError:
from itertools import izip_longest as izip_longest, repeat # python 2
import logging
try:
import queue # python 3
except ImportError:
import Queue as queue # python 2
import sys
import time
import six
from .base import (
Consumer,
FETCH_DEFAULT_BLOCK_TIMEOUT,
AUTO_COMMIT_MSG_COUNT,
AUTO_COMMIT_INTERVAL,
FETCH_MIN_BYTES,
FETCH_BUFFER_SIZE_BYTES,
MAX_FETCH_BUFFER_SIZE_BYTES,
FETCH_MAX_WAIT_TIME,
ITER_TIMEOUT_SECONDS,
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
)
log = logging.getLogger(__name__)
class FetchContext(object):
"""
Class for managing the state of a consumer during fetch
"""
def __init__(self, consumer, block, timeout):
self.consumer = consumer
self.block = block
if block:
if not timeout:
timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
self.timeout = timeout * 1000
def __enter__(self):
"""Set fetch values based on blocking status"""
self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
if self.block:
self.consumer.fetch_max_wait_time = self.timeout
self.consumer.fetch_min_bytes = 1
else:
self.consumer.fetch_min_bytes = 0
def __exit__(self, type, value, traceback):
"""Reset values"""
self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
class SimpleConsumer(Consumer):
"""
A simple consumer implementation that consumes all/specified partitions
for a topic
Arguments:
client: a connected KafkaClient
group: a name for this consumer, used for offset storage and must be unique
If you are connecting to a server that does not support offset
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
topic: the topic to consume
Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
wait before commit
fetch_size_bytes: number of bytes to request in a FetchRequest
buffer_size: default 4K. Initial number of bytes to tell kafka we
have available. This will double as needed.
max_buffer_size: default 16K. Max number of bytes to tell kafka we have
available. None means no limit.
iter_timeout: default None. How much time (in seconds) to wait for a
message in the iterator before exiting. None means no
timeout, so it will wait forever.
auto_offset_reset: default largest. Reset partition offsets upon
OffsetOutOfRangeError. Valid values are largest and smallest.
Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
Auto commit details:
If both auto_commit_every_n and auto_commit_every_t are set, they will
reset one another when one is triggered. These triggers simply call the
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
fetch_size_bytes=FETCH_MIN_BYTES,
buffer_size=FETCH_BUFFER_SIZE_BYTES,
max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
iter_timeout=None,
auto_offset_reset='largest'):
super(SimpleConsumer, self).__init__(
client, group, topic,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
if max_buffer_size is not None and buffer_size > max_buffer_size:
raise ValueError('buffer_size (%d) is greater than '
'max_buffer_size (%d)' %
(buffer_size, max_buffer_size))
self.buffer_size = buffer_size
self.max_buffer_size = max_buffer_size
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
self.fetch_offsets = self.offsets.copy()
self.iter_timeout = iter_timeout
self.auto_offset_reset = auto_offset_reset
self.queue = queue.Queue()
def __repr__(self):
return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
"""Update offsets using auto_offset_reset policy (smallest|largest)
Arguments:
partition (int): the partition for which offsets should be updated
Returns: Updated offset on success, None on failure
"""
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
reqs = [OffsetRequest(self.topic, partition, LATEST, 1)]
elif self.auto_offset_reset == 'smallest':
reqs = [OffsetRequest(self.topic, partition, EARLIEST, 1)]
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
# send_offset_request
log.info('Resetting topic-partition offset to %s for %s:%d',
self.auto_offset_reset, self.topic, partition)
try:
(resp, ) = self.client.send_offset_request(reqs)
except KafkaError as e:
log.error('%s sending offset request for %s:%d',
e.__class__.__name__, self.topic, partition)
else:
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]
def seek(self, offset, whence=None, partition=None):
"""
Alter the current offset in the consumer, similar to fseek
Arguments:
offset: how much to modify the offset
whence: where to modify it from, default is None
* None is an absolute offset
* 0 is relative to the earliest available offset (head)
* 1 is relative to the current offset
* 2 is relative to the latest known offset (tail)
partition: modify which partition, default is None.
If partition is None, would modify all partitions.
"""
if whence is None: # set an absolute offset
if partition is None:
for tmp_partition in self.offsets:
self.offsets[tmp_partition] = offset
else:
self.offsets[partition] = offset
elif whence == 1: # relative to current position
if partition is None:
for tmp_partition, _offset in self.offsets.items():
self.offsets[tmp_partition] = _offset + offset
else:
self.offsets[partition] += offset
elif whence in (0, 2): # relative to beginning or end
reqs = []
deltas = {}
if partition is None:
# divide the request offset by number of partitions,
# distribute the remained evenly
(delta, rem) = divmod(offset, len(self.offsets))
for tmp_partition, r in izip_longest(self.offsets.keys(),
repeat(1, rem),
fillvalue=0):
deltas[tmp_partition] = delta + r
for tmp_partition in self.offsets.keys():
if whence == 0:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-2,
1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic,
tmp_partition,
-1,
1))
else:
pass
else:
deltas[partition] = offset
if whence == 0:
reqs.append(OffsetRequest(self.topic, partition, -2, 1))
elif whence == 2:
reqs.append(OffsetRequest(self.topic, partition, -1, 1))
else:
pass
resps = self.client.send_offset_request(reqs)
for resp in resps:
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
raise ValueError('Unexpected value for `whence`, %d' % whence)
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
self.count_since_commit += 1
if self.auto_commit:
self.commit()
self.queue = queue.Queue()
def get_messages(self, count=1, block=True, timeout=0.1):
"""
Fetch the specified number of messages
Keyword Arguments:
count: Indicates the maximum number of messages to be fetched
block: If True, the API will block till all messages are fetched.
If block is a positive integer the API will block until that
many messages are fetched.
timeout: When blocking is requested the function will block for
the specified time (in seconds) until count messages is
fetched. If None, it will block forever.
"""
messages = []
if timeout is not None:
timeout += time.time()
new_offsets = {}
log.debug('getting %d messages', count)
while len(messages) < count:
block_time = timeout - time.time()
log.debug('calling _get_message block=%s timeout=%s', block, block_time)
block_next_call = block is True or block > len(messages)
result = self._get_message(block_next_call, block_time,
get_partition_info=True,
update_offset=False)
log.debug('got %s from _get_messages', result)
if not result:
if block_next_call and (timeout is None or time.time() <= timeout):
continue
break
partition, message = result
_msg = (partition, message) if self.partition_info else message
messages.append(_msg)
new_offsets[partition] = message.offset + 1
# Update and commit offsets if necessary
self.offsets.update(new_offsets)
self.count_since_commit += len(messages)
self._auto_commit()
log.debug('got %d messages: %s', len(messages), messages)
return messages
def get_message(self, block=True, timeout=0.1, get_partition_info=None):
return self._get_message(block, timeout, get_partition_info)
def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
update_offset=True):
"""
If no messages can be fetched, returns None.
If get_partition_info is None, it defaults to self.partition_info
If get_partition_info is True, returns (partition, message)
If get_partition_info is False, returns message
"""
start_at = time.time()
while self.queue.empty():
# We're out of messages, go grab some more.
log.debug('internal queue empty, fetching more messages')
with FetchContext(self, block, timeout):
self._fetch()
if not block or time.time() > (start_at + timeout):
break
try:
partition, message = self.queue.get_nowait()
if update_offset:
# Update partition offset
self.offsets[partition] = message.offset + 1
# Count, check and commit messages if necessary
self.count_since_commit += 1
self._auto_commit()
if get_partition_info is None:
get_partition_info = self.partition_info
if get_partition_info:
return partition, message
else:
return message
except queue.Empty:
log.debug('internal queue empty after fetch - returning None')
return None
def __iter__(self):
if self.iter_timeout is None:
timeout = ITER_TIMEOUT_SECONDS
else:
timeout = self.iter_timeout
while True:
message = self.get_message(True, timeout)
if message:
yield message
elif self.iter_timeout is None:
# We did not receive any message yet but we don't have a
# timeout, so give up the CPU for a while before trying again
time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
else:
# Timed out waiting for a message
break
def _fetch(self):
# Create fetch request payloads for all the partitions
partitions = dict((p, self.buffer_size)
for p in self.fetch_offsets.keys())
while partitions:
requests = []
for partition, buffer_size in six.iteritems(partitions):
requests.append(FetchRequest(self.topic, partition,
self.fetch_offsets[partition],
buffer_size))
# Send request
responses = self.client.send_fetch_request(
requests,
max_wait_time=int(self.fetch_max_wait_time),
min_bytes=self.fetch_min_bytes,
fail_on_error=False
)
retry_partitions = {}
for resp in responses:
try:
check_error(resp)
except UnknownTopicOrPartitionError:
log.error('UnknownTopicOrPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
raise
except NotLeaderForPartitionError:
log.error('NotLeaderForPartitionError for %s:%d',
resp.topic, resp.partition)
self.client.reset_topic_metadata(resp.topic)
continue
except OffsetOutOfRangeError:
log.warning('OffsetOutOfRangeError for %s:%d. '
'Resetting partition offset...',
resp.topic, resp.partition)
self.reset_partition_offset(resp.partition)
# Retry this partition
retry_partitions[resp.partition] = partitions[resp.partition]
continue
except FailedPayloadsError as e:
log.warning('FailedPayloadsError for %s:%d',
e.payload.topic, e.payload.partition)
# Retry this partition
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
continue
partition = resp.partition
buffer_size = partitions[partition]
try:
for message in resp.messages:
if message.offset < self.fetch_offsets[partition]:
log.debug('Skipping message %s because its offset is less than the consumer offset',
message)
continue
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
buffer_size == self.max_buffer_size):
log.error('Max fetch size %d too small',
self.max_buffer_size)
raise
if self.max_buffer_size is None:
buffer_size *= 2
else:
buffer_size = min(buffer_size * 2,
self.max_buffer_size)
log.warning('Fetch size too small, increase to %d (2x) '
'and retry', buffer_size)
retry_partitions[partition] = buffer_size
except ConsumerNoMoreData as e:
log.debug('Iteration was ended by %r', e)
except StopIteration:
# Stop iterating through this partition
log.debug('Done iterating over partition %s', partition)
partitions = retry_partitions
|