1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Examples to show how to create async EventHubProducerClient/EventHubConsumerClient.
"""
import asyncio
import os
from azure.eventhub import TransportType
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient, EventHubSharedKeyCredential
CONNECTION_STRING = os.environ['EVENT_HUB_CONN_STR']
FULLY_QUALIFIED_NAMESPACE = os.environ['EVENT_HUB_HOSTNAME']
EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']
SAS_POLICY = os.environ['EVENT_HUB_SAS_POLICY']
SAS_KEY = os.environ['EVENT_HUB_SAS_KEY']
CONSUMER_GROUP = "$Default"
async def create_producer_client():
print('Examples showing how to create producer client.')
# Create producer client from connection string.
producer_client = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STRING # connection string contains EventHub name.
)
# Illustration of commonly used parameters.
producer_client = EventHubProducerClient.from_connection_string(
conn_str=CONNECTION_STRING,
eventhub_name=EVENTHUB_NAME, # EventHub name should be specified if it doesn't show up in connection string.
logging_enable=False, # To enable network tracing log, set logging_enable to True.
retry_total=3, # Retry up to 3 times to re-do failed operations.
transport_type=TransportType.Amqp # Use Amqp as the underlying transport protocol.
)
# Create producer client from constructor.
producer_client = EventHubProducerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENTHUB_NAME,
credential=EventHubSharedKeyCredential(
policy=SAS_POLICY,
key=SAS_KEY
),
logging_enable=False, # To enable network tracing log, set logging_enable to True.
retry_total=3, # Retry up to 3 times to re-do failed operations.
transport_type=TransportType.Amqp # Use Amqp as the underlying transport protocol.
)
async with producer_client:
print("Calling producer client get eventhub properties:", await producer_client.get_eventhub_properties())
async def create_consumer_client():
print('Examples showing how to create consumer client.')
# Create consumer client from connection string.
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STRING, # connection string contains EventHub name.
consumer_group=CONSUMER_GROUP
)
# Illustration of commonly used parameters.
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=CONNECTION_STRING,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENTHUB_NAME, # EventHub name should be specified if it doesn't show up in connection string.
logging_enable=False, # To enable network tracing log, set logging_enable to True.
retry_total=3, # Retry up to 3 times to re-do failed operations.
transport_type=TransportType.Amqp # Use Amqp as the underlying transport protocol.
)
# Create consumer client from constructor.
consumer_client = EventHubConsumerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENTHUB_NAME,
consumer_group=CONSUMER_GROUP,
credential=EventHubSharedKeyCredential(
policy=SAS_POLICY,
key=SAS_KEY
),
logging_enable=False, # To enable network tracing log, set logging_enable to True.
retry_total=3, # Retry up to 3 times to re-do failed operations.
transport_type=TransportType.Amqp # Use Amqp as the underlying transport protocol.
)
async with consumer_client:
print("Calling consumer client get eventhub properties:", await consumer_client.get_eventhub_properties())
asyncio.run(create_producer_client())
asyncio.run(create_consumer_client())
|