1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import pytest
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub.exceptions import AuthenticationError, ConnectError, EventHubError
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties(live_eventhub, uamqp_transport):
client = EventHubConsumerClient(live_eventhub['hostname'], live_eventhub['event_hub'], '$default',
EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
properties = await client.get_eventhub_properties()
assert properties['eventhub_name'] == live_eventhub['event_hub'] and properties['partition_ids'] == ['0', '1']
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties_with_auth_error_async(live_eventhub, uamqp_transport):
client = EventHubConsumerClient(live_eventhub['hostname'], live_eventhub['event_hub'], '$default',
EventHubSharedKeyCredential(live_eventhub['key_name'], "AaBbCcDdEeFf="),
uamqp_transport=uamqp_transport
)
async with client:
with pytest.raises(AuthenticationError) as e:
await client.get_eventhub_properties()
client = EventHubConsumerClient(live_eventhub['hostname'], live_eventhub['event_hub'], '$default',
EventHubSharedKeyCredential("invalid", live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
with pytest.raises(AuthenticationError) as e:
await client.get_eventhub_properties()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_properties_with_connect_error(live_eventhub, uamqp_transport):
client = EventHubConsumerClient(live_eventhub['hostname'], "invalid", '$default',
EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
with pytest.raises(ConnectError) as e:
await client.get_eventhub_properties()
client = EventHubConsumerClient("invalid.servicebus.windows.net", live_eventhub['event_hub'], '$default',
EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
with pytest.raises(ConnectError) as e:
await client.get_eventhub_properties()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_partition_ids(live_eventhub, uamqp_transport):
client = EventHubConsumerClient(live_eventhub['hostname'], live_eventhub['event_hub'], '$default',
EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
partition_ids = await client.get_partition_ids()
assert partition_ids == ['0', '1']
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_get_partition_properties(live_eventhub, uamqp_transport):
client = EventHubProducerClient(live_eventhub['hostname'], live_eventhub['event_hub'],
EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']),
uamqp_transport=uamqp_transport
)
async with client:
properties = await client.get_partition_properties('0')
assert properties['eventhub_name'] == live_eventhub['event_hub'] \
and properties['id'] == '0' \
and 'beginning_sequence_number' in properties \
and 'last_enqueued_sequence_number' in properties \
and 'last_enqueued_offset' in properties \
and 'last_enqueued_time_utc' in properties \
and 'is_empty' in properties
|