1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
|
from __future__ import absolute_import
from collections import namedtuple
from copy import deepcopy
import logging
import random
import sys
import time
import six
from kafka.client import KafkaClient
from kafka.common import (
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
OffsetsStruct = namedtuple("OffsetsStruct", ["fetch", "highwater", "commit", "task_done"])
DEFAULT_CONSUMER_CONFIG = {
'client_id': __name__,
'group_id': None,
'bootstrap_servers': [],
'socket_timeout_ms': 30 * 1000,
'fetch_message_max_bytes': 1024 * 1024,
'auto_offset_reset': 'largest',
'fetch_min_bytes': 1,
'fetch_wait_max_ms': 100,
'refresh_leader_backoff_ms': 200,
'deserializer_class': lambda msg: msg,
'auto_commit_enable': False,
'auto_commit_interval_ms': 60 * 1000,
'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,
# Currently unused
'socket_receive_buffer_bytes': 64 * 1024,
'num_consumer_fetchers': 1,
'default_fetcher_backoff_ms': 1000,
'queued_max_message_chunks': 10,
'rebalance_max_retries': 4,
'rebalance_backoff_ms': 2000,
}
DEPRECATED_CONFIG_KEYS = {
'metadata_broker_list': 'bootstrap_servers',
}
class KafkaConsumer(object):
"""A simpler kafka consumer"""
DEFAULT_CONFIG = deepcopy(DEFAULT_CONSUMER_CONFIG)
def __init__(self, *topics, **configs):
self.configure(**configs)
self.set_topic_partitions(*topics)
def configure(self, **configs):
"""Configure the consumer instance
Configuration settings can be passed to constructor,
otherwise defaults will be used:
Keyword Arguments:
bootstrap_servers (list): List of initial broker nodes the consumer
should contact to bootstrap initial cluster metadata. This does
not have to be the full node list. It just needs to have at
least one broker that will respond to a Metadata API Request.
client_id (str): a unique name for this client. Defaults to
'kafka.consumer.kafka'.
group_id (str): the name of the consumer group to join,
Offsets are fetched / committed to this group name.
fetch_message_max_bytes (int, optional): Maximum bytes for each
topic/partition fetch request. Defaults to 1024*1024.
fetch_min_bytes (int, optional): Minimum amount of data the server
should return for a fetch request, otherwise wait up to
fetch_wait_max_ms for more data to accumulate. Defaults to 1.
fetch_wait_max_ms (int, optional): Maximum time for the server to
block waiting for fetch_min_bytes messages to accumulate.
Defaults to 100.
refresh_leader_backoff_ms (int, optional): Milliseconds to backoff
when refreshing metadata on errors (subject to random jitter).
Defaults to 200.
socket_timeout_ms (int, optional): TCP socket timeout in
milliseconds. Defaults to 30*1000.
auto_offset_reset (str, optional): A policy for resetting offsets on
OffsetOutOfRange errors. 'smallest' will move to the oldest
available message, 'largest' will move to the most recent. Any
ofther value will raise the exception. Defaults to 'largest'.
deserializer_class (callable, optional): Any callable that takes a
raw message value and returns a deserialized value. Defaults to
lambda msg: msg.
auto_commit_enable (bool, optional): Enabling auto-commit will cause
the KafkaConsumer to periodically commit offsets without an
explicit call to commit(). Defaults to False.
auto_commit_interval_ms (int, optional): If auto_commit_enabled,
the milliseconds between automatic offset commits. Defaults to
60 * 1000.
auto_commit_interval_messages (int, optional): If
auto_commit_enabled, a number of messages consumed between
automatic offset commits. Defaults to None (disabled).
consumer_timeout_ms (int, optional): number of millisecond to throw
a timeout exception to the consumer if no message is available
for consumption. Defaults to -1 (dont throw exception).
Configuration parameters are described in more detail at
http://kafka.apache.org/documentation.html#highlevelconsumerapi
"""
configs = self._deprecate_configs(**configs)
self._config = {}
for key in self.DEFAULT_CONFIG:
self._config[key] = configs.pop(key, self.DEFAULT_CONFIG[key])
if configs:
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError(
'KafkaConsumer configured to auto-commit '
'without required consumer group (group_id)'
)
# Check auto-commit configuration
if self._config['auto_commit_enable']:
logger.info("Configuring consumer to auto-commit offsets")
self._reset_auto_commit()
if not self._config['bootstrap_servers']:
raise KafkaConfigurationError(
'bootstrap_servers required to configure KafkaConsumer'
)
self._client = KafkaClient(
self._config['bootstrap_servers'],
client_id=self._config['client_id'],
timeout=(self._config['socket_timeout_ms'] / 1000.0)
)
def set_topic_partitions(self, *topics):
"""
Set the topic/partitions to consume
Optionally specify offsets to start from
Accepts types:
* str (utf-8): topic name (will consume all available partitions)
* tuple: (topic, partition)
* dict:
- { topic: partition }
- { topic: [partition list] }
- { topic: (partition tuple,) }
Optionally, offsets can be specified directly:
* tuple: (topic, partition, offset)
* dict: { (topic, partition): offset, ... }
Example:
.. code:: python
kafka = KafkaConsumer()
# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
# Consume topic1-0 starting at offset 12, and topic2-1 at offset 45
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 12), ("topic2", 1, 45))
# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 12, ("topic2", 1): 45 })
"""
self._topics = []
self._client.load_metadata_for_topics()
# Setup offsets
self._offsets = OffsetsStruct(fetch=dict(),
commit=dict(),
highwater=dict(),
task_done=dict())
# Handle different topic types
for arg in topics:
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
topic = kafka_bytestring(arg)
for partition in self._client.get_partition_ids_for_topic(topic):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
topic = kafka_bytestring(arg[0])
partition = arg[1]
self._consume_topic_partition(topic, partition)
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
for key, value in six.iteritems(arg):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
topic = kafka_bytestring(key)
# topic: partition
if isinstance(value, int):
self._consume_topic_partition(topic, value)
# topic: [ partition1, partition2, ... ]
elif isinstance(value, (list, tuple)):
for partition in value:
self._consume_topic_partition(topic, partition)
else:
raise KafkaConfigurationError(
'Unknown topic type '
'(dict key must be int or list/tuple of ints)'
)
# (topic, partition): offset
elif isinstance(key, tuple):
topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[(topic, partition)] = value
else:
raise KafkaConfigurationError('Unknown topic type (%s)' % type(arg))
# If we have a consumer group, try to fetch stored offsets
if self._config['group_id']:
self._get_commit_offsets()
# Update missing fetch/commit offsets
for topic_partition in self._topics:
# Commit offsets default is None
if topic_partition not in self._offsets.commit:
self._offsets.commit[topic_partition] = None
# Skip if we already have a fetch offset from user args
if topic_partition not in self._offsets.fetch:
# Fetch offsets default is (1) commit
if self._offsets.commit[topic_partition] is not None:
self._offsets.fetch[topic_partition] = self._offsets.commit[topic_partition]
# or (2) auto reset
else:
self._offsets.fetch[topic_partition] = self._reset_partition_offset(topic_partition)
# highwater marks (received from server on fetch response)
# and task_done (set locally by user)
# should always get initialized to None
self._reset_highwater_offsets()
self._reset_task_done_offsets()
# Reset message iterator in case we were in the middle of one
self._reset_message_iterator()
def close(self):
"""Close this consumer's underlying client."""
self._client.close()
def next(self):
"""Return the next available message
Blocks indefinitely unless consumer_timeout_ms > 0
Returns:
a single KafkaMessage from the message iterator
Raises:
ConsumerTimeout after consumer_timeout_ms and no message
Note:
This is also the method called internally during iteration
"""
self._set_consumer_timeout_start()
while True:
try:
return six.next(self._get_message_iterator())
# Handle batch completion
except StopIteration:
self._reset_message_iterator()
self._check_consumer_timeout()
def fetch_messages(self):
"""Sends FetchRequests for all topic/partitions set for consumption
Returns:
Generator that yields KafkaMessage structs
after deserializing with the configured `deserializer_class`
Note:
Refreshes metadata on errors, and resets fetch offset on
OffsetOutOfRange, per the configured `auto_offset_reset` policy
See Also:
Key KafkaConsumer configuration parameters:
* `fetch_message_max_bytes`
* `fetch_max_wait_ms`
* `fetch_min_bytes`
* `deserializer_class`
* `auto_offset_reset`
"""
max_bytes = self._config['fetch_message_max_bytes']
max_wait_time = self._config['fetch_wait_max_ms']
min_bytes = self._config['fetch_min_bytes']
if not self._topics:
raise KafkaConfigurationError('No topics or partitions configured')
if not self._offsets.fetch:
raise KafkaConfigurationError(
'No fetch offsets found when calling fetch_messages'
)
fetches = [FetchRequest(topic, partition,
self._offsets.fetch[(topic, partition)],
max_bytes)
for (topic, partition) in self._topics]
# send_fetch_request will batch topic/partition requests by leader
responses = self._client.send_fetch_request(
fetches,
max_wait_time=max_wait_time,
min_bytes=min_bytes,
fail_on_error=False
)
for resp in responses:
if isinstance(resp, FailedPayloadsError):
logger.warning('FailedPayloadsError attempting to fetch data')
self._refresh_metadata_on_error()
continue
topic = kafka_bytestring(resp.topic)
partition = resp.partition
try:
check_error(resp)
except OffsetOutOfRangeError:
logger.warning('OffsetOutOfRange: topic %s, partition %d, '
'offset %d (Highwatermark: %d)',
topic, partition,
self._offsets.fetch[(topic, partition)],
resp.highwaterMark)
# Reset offset
self._offsets.fetch[(topic, partition)] = (
self._reset_partition_offset((topic, partition))
)
continue
except NotLeaderForPartitionError:
logger.warning("NotLeaderForPartitionError for %s - %d. "
"Metadata may be out of date",
topic, partition)
self._refresh_metadata_on_error()
continue
except RequestTimedOutError:
logger.warning("RequestTimedOutError for %s - %d",
topic, partition)
continue
# Track server highwater mark
self._offsets.highwater[(topic, partition)] = resp.highwaterMark
# Yield each message
# Kafka-python could raise an exception during iteration
# we are not catching -- user will need to address
for (offset, message) in resp.messages:
# deserializer_class could raise an exception here
val = self._config['deserializer_class'](message.value)
msg = KafkaMessage(topic, partition, offset, message.key, val)
# in some cases the server will return earlier messages
# than we requested. skip them per kafka spec
if offset < self._offsets.fetch[(topic, partition)]:
logger.debug('message offset less than fetched offset '
'skipping: %s', msg)
continue
# Only increment fetch offset
# if we safely got the message and deserialized
self._offsets.fetch[(topic, partition)] = offset + 1
# Then yield to user
yield msg
def get_partition_offsets(self, topic, partition, request_time_ms, max_num_offsets):
"""Request available fetch offsets for a single topic/partition
Keyword Arguments:
topic (str): topic for offset request
partition (int): partition for offset request
request_time_ms (int): Used to ask for all messages before a
certain time (ms). There are two special values.
Specify -1 to receive the latest offset (i.e. the offset of the
next coming message) and -2 to receive the earliest available
offset. Note that because offsets are pulled in descending
order, asking for the earliest offset will always return you a
single element.
max_num_offsets (int): Maximum offsets to include in the OffsetResponse
Returns:
a list of offsets in the OffsetResponse submitted for the provided
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)
check_error(resp)
# Just for sanity..
# probably unnecessary
assert resp.topic == topic
assert resp.partition == partition
return resp.offsets
def offsets(self, group=None):
"""Get internal consumer offset values
Keyword Arguments:
group: Either "fetch", "commit", "task_done", or "highwater".
If no group specified, returns all groups.
Returns:
A copy of internal offsets struct
"""
if not group:
return {
'fetch': self.offsets('fetch'),
'commit': self.offsets('commit'),
'task_done': self.offsets('task_done'),
'highwater': self.offsets('highwater')
}
else:
return dict(deepcopy(getattr(self._offsets, group)))
def task_done(self, message):
"""Mark a fetched message as consumed.
Offsets for messages marked as "task_done" will be stored back
to the kafka cluster for this consumer group on commit()
Arguments:
message (KafkaMessage): the message to mark as complete
Returns:
True, unless the topic-partition for this message has not
been configured for the consumer. In normal operation, this
should not happen. But see github issue 364.
"""
topic_partition = (message.topic, message.partition)
if topic_partition not in self._topics:
logger.warning('Unrecognized topic/partition in task_done message: '
'{0}:{1}'.format(*topic_partition))
return False
offset = message.offset
# Warn on non-contiguous offsets
prev_done = self._offsets.task_done[topic_partition]
if prev_done is not None and offset != (prev_done + 1):
logger.warning('Marking task_done on a non-continuous offset: %d != %d + 1',
offset, prev_done)
# Warn on smaller offsets than previous commit
# "commit" offsets are actually the offset of the next message to fetch.
prev_commit = self._offsets.commit[topic_partition]
if prev_commit is not None and ((offset + 1) <= prev_commit):
logger.warning('Marking task_done on a previously committed offset?: %d (+1) <= %d',
offset, prev_commit)
self._offsets.task_done[topic_partition] = offset
# Check for auto-commit
if self._does_auto_commit_messages():
self._incr_auto_commit_message_count()
if self._should_auto_commit():
self.commit()
return True
def commit(self):
"""Store consumed message offsets (marked via task_done())
to kafka cluster for this consumer_group.
Returns:
True on success, or False if no offsets were found for commit
Note:
this functionality requires server version >=0.8.1.1
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
"""
if not self._config['group_id']:
logger.warning('Cannot commit without a group_id!')
raise KafkaConfigurationError(
'Attempted to commit offsets '
'without a configured consumer group (group_id)'
)
# API supports storing metadata with each commit
# but for now it is unused
metadata = b''
offsets = self._offsets.task_done
commits = []
for topic_partition, task_done_offset in six.iteritems(offsets):
# Skip if None
if task_done_offset is None:
continue
# Commit offsets as the next offset to fetch
# which is consistent with the Java Client
# task_done is marked by messages consumed,
# so add one to mark the next message for fetching
commit_offset = (task_done_offset + 1)
# Skip if no change from previous committed
if commit_offset == self._offsets.commit[topic_partition]:
continue
commits.append(
OffsetCommitRequest(topic_partition[0], topic_partition[1],
commit_offset, metadata)
)
if commits:
logger.info('committing consumer offsets to group %s', self._config['group_id'])
resps = self._client.send_offset_commit_request(
kafka_bytestring(self._config['group_id']), commits,
fail_on_error=False
)
for r in resps:
check_error(r)
topic_partition = (r.topic, r.partition)
task_done = self._offsets.task_done[topic_partition]
self._offsets.commit[topic_partition] = (task_done + 1)
if self._config['auto_commit_enable']:
self._reset_auto_commit()
return True
else:
logger.info('No new offsets found to commit in group %s', self._config['group_id'])
return False
#
# Topic/partition management private methods
#
def _consume_topic_partition(self, topic, partition):
topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
if topic not in self._client.topic_partitions:
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
if partition not in self._client.get_partition_ids_for_topic(topic):
raise UnknownTopicOrPartitionError("Partition %d not found in Topic %s "
"in broker metadata" % (partition, topic))
logger.info("Configuring consumer to fetch topic '%s', partition %d", topic, partition)
self._topics.append((topic, partition))
def _refresh_metadata_on_error(self):
refresh_ms = self._config['refresh_leader_backoff_ms']
jitter_pct = 0.20
sleep_ms = random.randint(
int((1.0 - 0.5 * jitter_pct) * refresh_ms),
int((1.0 + 0.5 * jitter_pct) * refresh_ms)
)
while True:
logger.info("Sleeping for refresh_leader_backoff_ms: %d", sleep_ms)
time.sleep(sleep_ms / 1000.0)
try:
self._client.load_metadata_for_topics()
except KafkaUnavailableError:
logger.warning("Unable to refresh topic metadata... cluster unavailable")
self._check_consumer_timeout()
else:
logger.info("Topic metadata refreshed")
return
#
# Offset-managment private methods
#
def _get_commit_offsets(self):
logger.info("Consumer fetching stored offsets")
for topic_partition in self._topics:
(resp,) = self._client.send_offset_fetch_request(
kafka_bytestring(self._config['group_id']),
[OffsetFetchRequest(topic_partition[0], topic_partition[1])],
fail_on_error=False)
try:
check_error(resp)
# API spec says server wont set an error here
# but 0.8.1.1 does actually...
except UnknownTopicOrPartitionError:
pass
# -1 offset signals no commit is currently stored
if resp.offset == -1:
self._offsets.commit[topic_partition] = None
# Otherwise we committed the stored offset
# and need to fetch the next one
else:
self._offsets.commit[topic_partition] = resp.offset
def _reset_highwater_offsets(self):
for topic_partition in self._topics:
self._offsets.highwater[topic_partition] = None
def _reset_task_done_offsets(self):
for topic_partition in self._topics:
self._offsets.task_done[topic_partition] = None
def _reset_partition_offset(self, topic_partition):
(topic, partition) = topic_partition
LATEST = -1
EARLIEST = -2
request_time_ms = None
if self._config['auto_offset_reset'] == 'largest':
request_time_ms = LATEST
elif self._config['auto_offset_reset'] == 'smallest':
request_time_ms = EARLIEST
else:
# Let's raise an reasonable exception type if user calls
# outside of an exception context
if sys.exc_info() == (None, None, None):
raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
'valid auto_offset_reset setting '
'(largest|smallest)')
# Otherwise we should re-raise the upstream exception
# b/c it typically includes additional data about
# the request that triggered it, and we do not want to drop that
raise # pylint: disable-msg=E0704
(offset, ) = self.get_partition_offsets(topic, partition,
request_time_ms, max_num_offsets=1)
return offset
#
# Consumer Timeout private methods
#
def _set_consumer_timeout_start(self):
self._consumer_timeout = False
if self._config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (self._config['consumer_timeout_ms'] / 1000.0)
def _check_consumer_timeout(self):
if self._consumer_timeout and time.time() > self._consumer_timeout:
raise ConsumerTimeout('Consumer timed out after %d ms' % + self._config['consumer_timeout_ms'])
#
# Autocommit private methods
#
def _should_auto_commit(self):
if self._does_auto_commit_ms():
if time.time() >= self._next_commit_time:
return True
if self._does_auto_commit_messages():
if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
return True
return False
def _reset_auto_commit(self):
self._uncommitted_message_count = 0
self._next_commit_time = None
if self._does_auto_commit_ms():
self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
def _incr_auto_commit_message_count(self, n=1):
self._uncommitted_message_count += n
def _does_auto_commit_ms(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_ms']
if conf is not None and conf > 0:
return True
return False
def _does_auto_commit_messages(self):
if not self._config['auto_commit_enable']:
return False
conf = self._config['auto_commit_interval_messages']
if conf is not None and conf > 0:
return True
return False
#
# Message iterator private methods
#
def __iter__(self):
return self
def __next__(self):
return self.next()
def _get_message_iterator(self):
# Fetch a new batch if needed
if self._msg_iter is None:
self._msg_iter = self.fetch_messages()
return self._msg_iter
def _reset_message_iterator(self):
self._msg_iter = None
#
# python private methods
#
def __repr__(self):
return '<{0} topics=({1})>'.format(
self.__class__.__name__,
'|'.join(["%s-%d" % topic_partition
for topic_partition in self._topics])
)
#
# other private methods
#
def _deprecate_configs(self, **configs):
for old, new in six.iteritems(DEPRECATED_CONFIG_KEYS):
if old in configs:
logger.warning('Deprecated Kafka Consumer configuration: %s. '
'Please use %s instead.', old, new)
old_value = configs.pop(old)
if new not in configs:
configs[new] = old_value
return configs
|