1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
An example to show receiving events from Event Hub partitions with custom starting position.
"""
import os
from azure.eventhub import EventHubConsumerClient, EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"]
def on_partition_initialize(partition_context):
# Put your code here.
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
# Put your code here.
print("Partition: {} has been closed, reason for closing: {}.".format(partition_context.partition_id, reason))
def on_error(partition_context, error):
# Put your code here. partition_context can be None in the on_error callback.
if partition_context:
print(
"An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id, error
)
)
else:
print("An exception: {} occurred during the load balance process.".format(error))
def on_event(partition_context, event):
# Put your code here.
print("Received event: {} from partition: {}.".format(event.body_as_str(), partition_context.partition_id))
if __name__ == "__main__":
producer_client = EventHubProducerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENTHUB_NAME,
credential=DefaultAzureCredential(),
)
with producer_client:
event_data_batch_to_partition_0 = producer_client.create_batch(partition_id="0")
event_data_batch_to_partition_0.add(EventData("First event in partition 0"))
event_data_batch_to_partition_0.add(EventData("Second event in partition 0"))
event_data_batch_to_partition_0.add(EventData("Third event in partition 0"))
event_data_batch_to_partition_0.add(EventData("Forth event in partition 0"))
producer_client.send_batch(event_data_batch_to_partition_0)
consumer_client = EventHubConsumerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
credential=DefaultAzureCredential(),
)
partition_0_prop = consumer_client.get_partition_properties("0")
partition_0_last_enqueued_sequence_number = partition_0_prop["last_enqueued_sequence_number"]
# client will receive messages from the position of the third from last event on partition 0.
starting_position = {
"0": partition_0_last_enqueued_sequence_number - 3,
}
try:
with consumer_client:
consumer_client.receive(
partition_id="0",
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
starting_position=starting_position,
)
except KeyboardInterrupt:
print("Stopped receiving.")
|