1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Examples to show how to create EventHubProducerClient and EventHubConsumerClient that connect to custom endpoint.
"""
import os
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient, EventData
from azure.identity import DefaultAzureCredential
FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"]
# The custom endpoint address to use for establishing a connection to the Event Hubs service,
# allowing network requests to be routed through any application gateways
# or other paths needed for the host environment.
CUSTOM_ENDPOINT_ADDRESS = "sb://<custom_endpoint_hostname>:<custom_endpoint_port>"
# The optional absolute path to the custom certificate file used by client to authenticate the
# identity of the connection endpoint in the case that endpoint has its own issued CA.
# If not set, the certifi library will be used to load certificates.
CUSTOM_CA_BUNDLE_PATH = "<your_custom_ca_bundle_file_path>"
def producer_connecting_to_custom_endpoint():
producer_client = EventHubProducerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENTHUB_NAME,
credential=DefaultAzureCredential(),
custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS,
connection_verify=CUSTOM_CA_BUNDLE_PATH,
)
with producer_client:
# Without specifying partition_id or partition_key
# the events will be distributed to available partitions via round-robin.
event_data_batch = producer_client.create_batch()
event_data_batch.add(EventData("Single message"))
producer_client.send_batch(event_data_batch)
print("Send a message.")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))
def consumer_connecting_to_custom_endpoint():
consumer_client = EventHubConsumerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
credential=DefaultAzureCredential(),
custom_endpoint_address=CUSTOM_ENDPOINT_ADDRESS,
connection_verify=CUSTOM_CA_BUNDLE_PATH,
)
try:
with consumer_client:
consumer_client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
except KeyboardInterrupt:
print("Stopped receiving.")
producer_connecting_to_custom_endpoint()
consumer_connecting_to_custom_endpoint()
|