1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Examples to show sending events with different options to an Event Hub partition.
"""
import time
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.eventhub.exceptions import EventHubError
CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
def send_event_data_batch(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch = producer.create_batch()
event_data_batch.add(EventData('Single message'))
producer.send_batch(event_data_batch)
def send_event_data_batch_with_limited_size(producer):
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch_with_limited_size = producer.create_batch(max_size_in_bytes=1000)
while True:
try:
event_data_batch_with_limited_size.add(EventData('Message inside EventBatchData'))
except ValueError:
# EventDataBatch object reaches max_size.
# New EventDataBatch object can be created here to send more data.
break
producer.send_batch(event_data_batch_with_limited_size)
def send_event_data_batch_with_partition_key(producer):
# Specifying partition_key.
event_data_batch_with_partition_key = producer.create_batch(partition_key='pkey')
event_data_batch_with_partition_key.add(EventData('Message will be sent to a partition determined by the partition key'))
producer.send_batch(event_data_batch_with_partition_key)
def send_event_data_batch_with_partition_id(producer):
# Specifying partition_id.
event_data_batch_with_partition_id = producer.create_batch(partition_id='0')
event_data_batch_with_partition_id.add(EventData('Message will be sent to target-id partition'))
producer.send_batch(event_data_batch_with_partition_id)
def send_event_data_batch_with_properties(producer):
event_data_batch = producer.create_batch()
event_data = EventData('Message with properties')
event_data.properties = {'prop_key': 'prop_value'}
event_data_batch.add(event_data)
producer.send_batch(event_data_batch)
def send_event_data_list(producer):
# If you know beforehand that the list of events you have will not exceed the
# size limits, you can use the `send_batch()` api directly without creating an EventDataBatch
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_list = [EventData('Event Data {}'.format(i)) for i in range(10)]
try:
producer.send_batch(event_data_list)
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
print("Size of the event data list exceeds the size limit of a single send")
except EventHubError as eh_err:
print("Sending error: ", eh_err)
producer = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STR,
eventhub_name=EVENTHUB_NAME
)
start_time = time.time()
with producer:
send_event_data_batch(producer)
send_event_data_batch_with_limited_size(producer)
send_event_data_batch_with_partition_key(producer)
send_event_data_batch_with_partition_id(producer)
send_event_data_batch_with_properties(producer)
send_event_data_list(producer)
print("Send messages in {} seconds.".format(time.time() - start_time))
|