1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
#!/usr/bin/env python
import confluent_kafka
import confluent_kafka.avro
import logging
class CountingFilter(logging.Filter):
def __init__(self, name):
self.name = name
self.cnt = 0
def filter(self, record):
print(self.name, record.getMessage())
self.cnt += 1
print(record)
def test_logging_consumer():
""" Tests that logging works """
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('consumer')
logger.addFilter(f)
kc = confluent_kafka.Consumer({'group.id': 'test',
'debug': 'all'},
logger=logger)
while f.cnt == 0:
kc.poll(timeout=0.5)
print('%s: %d log messages seen' % (f.name, f.cnt))
kc.close()
def test_logging_avro_consumer():
""" Tests that logging works """
logger = logging.getLogger('avroconsumer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('avroconsumer')
logger.addFilter(f)
kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com',
'group.id': 'test',
'debug': 'all'},
logger=logger)
while f.cnt == 0:
kc.poll(timeout=0.5)
print('%s: %d log messages seen' % (f.name, f.cnt))
kc.close()
def test_logging_producer():
""" Tests that logging works """
logger = logging.getLogger('producer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('producer')
logger.addFilter(f)
p = confluent_kafka.Producer({'debug': 'all'}, logger=logger)
while f.cnt == 0:
p.poll(timeout=0.5)
print('%s: %d log messages seen' % (f.name, f.cnt))
def test_logging_avro_producer():
""" Tests that logging works """
logger = logging.getLogger('avroproducer')
logger.setLevel(logging.DEBUG)
f = CountingFilter('avroproducer')
logger.addFilter(f)
p = confluent_kafka.avro.AvroProducer({'schema.registry.url': 'http://example.com',
'debug': 'all'},
logger=logger)
while f.cnt == 0:
p.poll(timeout=0.5)
print('%s: %d log messages seen' % (f.name, f.cnt))
def test_logging_constructor():
""" Verify different forms of constructors """
for how in ['dict', 'dict+kwarg', 'kwarg']:
logger = logging.getLogger('producer: ' + how)
logger.setLevel(logging.DEBUG)
f = CountingFilter(logger.name)
logger.addFilter(f)
if how == 'dict':
p = confluent_kafka.Producer({'debug': 'all', 'logger': logger})
elif how == 'dict+kwarg':
p = confluent_kafka.Producer({'debug': 'all'}, logger=logger)
elif how == 'kwarg':
conf = {'debug': 'all', 'logger': logger}
p = confluent_kafka.Producer(**conf)
else:
raise RuntimeError('Not reached')
print('Test %s with %s' % (p, how))
while f.cnt == 0:
p.poll(timeout=0.5)
print('%s: %s: %d log messages seen' % (how, f.name, f.cnt))
|