1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
#!/usr/bin/env python
#
# Copyright 2019 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import sys
from confluent_kafka.experimental.aio import AIOProducer
from confluent_kafka.experimental.aio import AIOConsumer
import random
import logging
import signal
# This example demonstrates comprehensive AsyncIO usage patterns with Kafka:
# - Event loop safe callbacks that don't block the loop
# - Batched async produce with transaction handling
# - Proper async consumer with partition management
# - Graceful shutdown with signal handling
# - Thread pool integration for blocking operations
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
running = True
# AsyncIO Pattern: Event loop safe callbacks
# These callbacks are automatically scheduled onto the event loop by AIOProducer/AIOConsumer
# ensuring they don't block the loop and can safely interact with other async operations
async def error_cb(err):
logger.error(f'Kafka error: {err}')
async def throttle_cb(event):
logger.warning(f'Kafka throttle event: {event}')
async def stats_cb(stats_json_str):
logger.info(f'Kafka stats: {stats_json_str}')
def configure_common(conf):
bootstrap_servers = sys.argv[1]
conf.update({
'bootstrap.servers': bootstrap_servers,
'logger': logger,
'debug': 'conf',
'error_cb': error_cb,
'throttle_cb': throttle_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 5000,
})
return conf
async def run_producer():
topic = sys.argv[2]
# AsyncIO Pattern: Non-blocking producer with thread pool
# max_workers=5 creates a ThreadPoolExecutor for offloading blocking librdkafka calls
producer = AIOProducer(configure_common(
{
'transactional.id': 'producer1'
}), max_workers=5)
# AsyncIO Pattern: Async transaction lifecycle
# All transaction operations are awaitable and won't block the event loop
await producer.init_transactions()
# TODO: handle exceptions with transactional API
transaction_active = False
try:
while running:
await producer.begin_transaction()
transaction_active = True
# AsyncIO Pattern: Batched async produce with concurrent futures
# Creates 100 concurrent produce operations, each returning a Future
# that resolves when the message is delivered or fails
produce_futures = [await producer.produce(
topic=topic,
key=f'testkey{i}',
value=f'testvalue{i}')
for i in range(10)]
logger.info(f"Produced {len(produce_futures)} messages")
# Force a flush of the local buffer to ensure messages will be in flight before awaiting their delivery
# TODO: this shouldn't be strictly necessary in the future
await producer.flush()
# Wait for all produce operations to complete concurrently
for msg in await asyncio.gather(*produce_futures):
logger.info(
'Produced to: {} [{}] @ {}'.format(msg.topic(),
msg.partition(),
msg.offset()))
# AsyncIO Pattern: Non-blocking transaction commit
await producer.commit_transaction()
transaction_active = False
# Use asyncio.sleep() instead of time.sleep() to yield control to event loop
# Change this to sleep(0) in a real application as this is mimicing doing external work on the event loop
await asyncio.sleep(1)
except Exception as e:
logger.error(e)
finally:
# AsyncIO Pattern: Proper async cleanup
# Always clean up resources asynchronously to avoid blocking the event loop
if transaction_active:
await producer.abort_transaction()
await producer.close() # Stops background tasks and closes connections
logger.info('Closed producer')
async def run_consumer():
topic = sys.argv[2]
group_id = f'{topic}_{random.randint(1, 1000)}'
# AsyncIO Pattern: Non-blocking consumer with manual offset management
# Callbacks will be scheduled on the event loop automatically
consumer = AIOConsumer(configure_common(
{
'group.id': group_id,
'auto.offset.reset': 'latest',
'enable.auto.commit': 'false', # Manual commit for precise control
'enable.auto.offset.store': 'false', # Manual offset storage
'partition.assignment.strategy': 'cooperative-sticky',
}))
# AsyncIO Pattern: Async rebalance callbacks
# These callbacks can perform async operations safely within the event loop
async def on_assign(consumer, partitions):
# Calling incremental_assign is necessary to pause the assigned partitions
# otherwise it'll be done by the consumer after callback termination.
await consumer.incremental_assign(partitions)
await consumer.pause(partitions) # Demonstrates async partition control
logger.debug(f'on_assign {partitions}')
# Resume the partitions as it's just a pause example
await consumer.resume(partitions)
async def on_revoke(consumer, partitions):
logger.debug(f'before on_revoke {partitions}', )
try:
# AsyncIO Pattern: Non-blocking commit during rebalance
await consumer.commit() # Ensure offsets are committed before losing partitions
except Exception as e:
logger.info(f'Error during commit: {e}')
logger.debug(f'after on_revoke {partitions}')
async def on_lost(consumer, partitions):
logger.debug(f'on_lost {partitions}')
try:
await consumer.subscribe([topic],
on_assign=on_assign,
on_revoke=on_revoke,
# Remember to set a on_lost callback
# if you're committing on revocation
# as lost partitions cannot be committed
on_lost=on_lost)
i = 0
while running:
# AsyncIO Pattern: Non-blocking message polling
# poll() returns a coroutine that yields control back to the event loop
message = await consumer.poll(1.0)
if message is None:
continue
if i % 100 == 0:
# AsyncIO Pattern: Async metadata operations
# Both assignment() and position() are async and won't block the loop
position = await consumer.position(await consumer.assignment())
logger.info(f'Current position: {position}')
await consumer.commit() # Async commit of stored offsets
logger.info('Stored offsets were committed')
err = message.error()
if err:
logger.error(f'Error: {err}')
else:
logger.info(f'Consumed: {message.value()}')
# AsyncIO Pattern: Async offset storage
await consumer.store_offsets(message=message)
i += 1
finally:
# AsyncIO Pattern: Proper async consumer cleanup
# Always unsubscribe and close asynchronously
await consumer.unsubscribe() # Leave consumer group gracefully
await consumer.close() # Close connections and stop background tasks
logger.info('Closed consumer')
# AsyncIO Pattern: Signal handling for graceful shutdown
# Sets a flag that async tasks check to terminate cleanly
def signal_handler(*_):
global running
logger.info('Signal received, shutting down...')
running = False
async def main():
# AsyncIO Pattern: Signal handling setup
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# AsyncIO Pattern: Concurrent task execution
# Both producer and consumer run concurrently in the same event loop
producer_task = asyncio.create_task(run_producer())
consumer_task = asyncio.create_task(run_consumer())
# Wait for both tasks to complete (or be cancelled by signal)
await asyncio.gather(producer_task, consumer_task)
try:
asyncio.run(main())
except asyncio.exceptions.CancelledError as e:
logger.warning(f'Asyncio task was cancelled: {e}')
logger.info('End of example')
|