1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
from __future__ import absolute_import
import os
import random
import re
import string
import time
def special_to_underscore(string, _matcher=re.compile(r'[^a-zA-Z0-9_]+')):
return _matcher.sub('_', string)
def random_string(length):
return "".join(random.choice(string.ascii_letters) for i in range(length))
def env_kafka_version():
"""Return the Kafka version set in the OS environment as a tuple.
Example: '0.8.1.1' --> (0, 8, 1, 1)
"""
if 'KAFKA_VERSION' not in os.environ:
return ()
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
def assert_message_count(messages, num_messages):
"""Check that we received the expected number of messages with no duplicates."""
# Make sure we got them all
assert len(messages) == num_messages
# Make sure there are no duplicates
# Note: Currently duplicates are identified only using key/value. Other attributes like topic, partition, headers,
# timestamp, etc are ignored... this could be changed if necessary, but will be more tolerant of dupes.
unique_messages = {(m.key, m.value) for m in messages}
assert len(unique_messages) == num_messages
class Timer(object):
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *args):
self.end = time.time()
self.interval = self.end - self.start
|