1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Examples to show sending events with different options to an Event Hub asynchronously.
"""
# pylint: disable=C0111
import time
import asyncio
import os
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub.exceptions import EventHubError
from azure.eventhub import EventData
CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
async def send_event_data_batch(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch = await producer.create_batch()
event_data_batch.add(EventData('Single message'))
await producer.send_batch(event_data_batch)
async def send_event_data_batch_with_limited_size(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch_with_limited_size = await producer.create_batch(max_size_in_bytes=1000)
while True:
try:
event_data_batch_with_limited_size.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data.
break
await producer.send_batch(event_data_batch_with_limited_size)
async def send_event_data_batch_with_partition_key(producer):
# Specifying partition_key
event_data_batch_with_partition_key = await producer.create_batch(partition_key='pkey')
event_data_batch_with_partition_key.add(EventData('Message will be sent to a partition determined by the partition key'))
await producer.send_batch(event_data_batch_with_partition_key)
async def send_event_data_batch_with_partition_id(producer):
# Specifying partition_id.
event_data_batch_with_partition_id = await producer.create_batch(partition_id='0')
event_data_batch_with_partition_id.add(EventData('Message will be sent to target-id partition'))
await producer.send_batch(event_data_batch_with_partition_id)
async def send_event_data_batch_with_properties(producer):
event_data_batch = await producer.create_batch()
event_data = EventData('Message with properties')
event_data.properties = {'prop_key': 'prop_value'}
event_data_batch.add(event_data)
await producer.send_batch(event_data_batch)
async def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
await producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)
async def run():
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
async with producer:
await send_event_data_batch(producer)
await send_event_data_batch_with_limited_size(producer)
await send_event_data_batch_with_partition_key(producer)
await send_event_data_batch_with_partition_id(producer)
await send_event_data_batch_with_properties(producer)
await send_event_data_list(producer)
loop = asyncio.get_event_loop()
start_time = time.time()
loop.run_until_complete(run())
print("Send messages in {} seconds.".format(time.time() - start_time))
|