1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import sys
import pytest
from azure.eventhub.aio import (
EventHubConsumerClient,
EventHubSharedKeyCredential,
)
from azure.eventhub.exceptions import AuthenticationError, ConnectError
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties(auth_credentials_async, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, credential = auth_credentials_async
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group="$default",
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
properties = await client.get_eventhub_properties()
assert properties["eventhub_name"] == eventhub_name and properties["partition_ids"] == ["0", "1"]
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties_with_auth_error_async(auth_credentials_async, live_eventhub, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, _ = auth_credentials_async
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group="$default",
credential=EventHubSharedKeyCredential(live_eventhub["key_name"], "AaBbCcDdEeFf="),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
with pytest.raises(AuthenticationError) as e:
await client.get_eventhub_properties()
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group="$default",
credential=EventHubSharedKeyCredential("invalid", live_eventhub["access_key"]),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
with pytest.raises(AuthenticationError) as e:
await client.get_eventhub_properties()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties_with_connect_error(auth_credentials_async, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, credential = auth_credentials_async
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name="invalid",
consumer_group="$default",
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
with pytest.raises(ConnectError) as e:
await client.get_eventhub_properties()
client = EventHubConsumerClient(
fully_qualified_namespace="invalid.servicebus.windows.net",
eventhub_name=eventhub_name,
consumer_group="$default",
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
with pytest.raises(ConnectError) as e:
await client.get_eventhub_properties()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_partition_ids(auth_credentials_async, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, credential = auth_credentials_async
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group="$default",
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
partition_ids = await client.get_partition_ids()
assert partition_ids == ["0", "1"]
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_partition_properties(auth_credentials_async, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, credential = auth_credentials_async
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group="$default",
credential=credential(),
uamqp_transport=uamqp_transport,
**client_args,
)
async with client:
properties = await client.get_partition_properties("0")
assert (
properties["eventhub_name"] == eventhub_name
and properties["id"] == "0"
and "beginning_sequence_number" in properties
and "last_enqueued_sequence_number" in properties
and "last_enqueued_offset" in properties
and "last_enqueued_time_utc" in properties
and "is_empty" in properties
)
|