1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from azure.core.utils import parse_connection_string
from uamqp.client import ReceiveClient
from uamqp.async_ops.client_async import ReceiveClientAsync
from uamqp import authentication
from devtools_testutils.perfstress_tests import EventPerfTest
class UamqpReceiveEventTest(EventPerfTest):
def __init__(self, arguments):
super().__init__(arguments)
live_eventhub_config = self._get_eh_connection_config()
uri = "{}{}".format(live_eventhub_config["hostname"], live_eventhub_config["event_hub"])
source = "{}{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub_config["hostname"],
live_eventhub_config["event_hub"],
live_eventhub_config["consumer_group"],
live_eventhub_config["partition"],
).replace("sb://", "amqps://")
# Setup service clients
self.receive_client = ReceiveClient(
source,
auth=authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub_config["key_name"], live_eventhub_config["access_key"]
),
timeout=0,
debug=False,
)
self.async_receive_client = ReceiveClientAsync(
source,
auth=authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config["key_name"], live_eventhub_config["access_key"]
),
timeout=0,
debug=False,
)
def _get_eh_connection_config(self):
connection_string = self.get_from_env("AZURE_EVENTHUB_CONNECTION_STRING")
auth_info = parse_connection_string(connection_string)
config = {}
config["hostname"] = auth_info["endpoint"]
config["event_hub"] = self.get_from_env("AZURE_EVENTHUB_NAME")
config["key_name"] = auth_info["sharedaccesskeyname"]
config["access_key"] = auth_info["sharedaccesskey"]
config["consumer_group"] = "$Default"
config["partition"] = "0"
return config
def process_event_sync(self, message):
try:
message.annotations
message.properties
message.get_data()
message.header
message.delivery_annotations
self.event_raised_sync()
except Exception as e:
self.error_raised_sync(e)
def start_events_sync(self) -> None:
"""
Start the process for receiving events.
"""
self.receive_client.receive_messages(self.process_event_sync)
def stop_events_sync(self) -> None:
"""
Stop the process for receiving events.
"""
self.receive_client.close()
async def start_events_async(self) -> None:
"""
Start the process for receiving events.
"""
await self.async_receive_client.receive_messages_async(self.process_event_sync)
async def stop_events_async(self) -> None:
"""
Stop the process for receiving events.
"""
await self.async_receive_client.close_async()
|