1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import asyncio
import pytest
import sys
from azure.eventhub import (
EventData,
EventDataBatch,
)
from azure.eventhub.exceptions import (
EventHubError,
ConnectError,
AuthenticationError,
EventDataSendError
)
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_with_invalid_hostname_async(invalid_hostname, connstr_receivers):
if sys.platform.startswith('darwin'):
pytest.skip("Skipping on OSX - it keeps reporting 'Unable to set external certificates' "
"and blocking other tests")
_, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(invalid_hostname)
async with client:
with pytest.raises(ConnectError):
batch = EventDataBatch()
batch.add(EventData("test data"))
await client.send_batch(batch)
@pytest.mark.parametrize("invalid_place",
["hostname", "key_name", "access_key", "event_hub", "partition"])
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_receive_with_invalid_param_async(live_eventhub, invalid_place):
eventhub_config = live_eventhub.copy()
if invalid_place != "partition":
eventhub_config[invalid_place] = "invalid " + invalid_place
conn_str = live_eventhub["connection_str"].format(
eventhub_config['hostname'],
eventhub_config['key_name'],
eventhub_config['access_key'],
eventhub_config['event_hub'])
client = EventHubConsumerClient.from_connection_string(conn_str, consumer_group='$default', retry_total=0)
async def on_event(partition_context, event):
pass
async with client:
if invalid_place == "partition":
task = asyncio.ensure_future(client.receive(on_event, partition_id=invalid_place,
starting_position="-1"))
else:
task = asyncio.ensure_future(client.receive(on_event, partition_id="0",
starting_position="-1"))
await asyncio.sleep(10)
assert len(client._event_processors) == 1
await task
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_with_invalid_key_async(invalid_key):
client = EventHubProducerClient.from_connection_string(invalid_key)
async with client:
with pytest.raises(ConnectError):
batch = EventDataBatch()
batch.add(EventData("test data"))
await client.send_batch(batch)
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_with_invalid_policy_async(invalid_policy):
client = EventHubProducerClient.from_connection_string(invalid_policy)
async with client:
with pytest.raises(ConnectError):
batch = EventDataBatch()
batch.add(EventData("test data"))
await client.send_batch(batch)
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_non_existing_entity_sender_async(connection_str):
client = EventHubProducerClient.from_connection_string(connection_str, eventhub_name="nemo")
async with client:
with pytest.raises(ConnectError):
batch = EventDataBatch()
batch.add(EventData("test data"))
await client.send_batch(batch)
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_to_invalid_partitions_async(connection_str):
partitions = ["XYZ", "-1", "1000", "-"]
for p in partitions:
client = EventHubProducerClient.from_connection_string(connection_str)
try:
with pytest.raises(ConnectError):
batch = await client.create_batch(partition_id=p)
batch.add(EventData("test data"))
await client.send_batch(batch)
finally:
await client.close()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_too_large_message_async(connection_str):
if sys.platform.startswith('darwin'):
pytest.skip("Skipping on OSX - open issue regarding message size")
client = EventHubProducerClient.from_connection_string(connection_str)
try:
data = EventData(b"A" * 1100000)
with pytest.raises(ValueError):
batch = await client.create_batch()
batch.add(data)
finally:
await client.close()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_null_body_async(connection_str):
client = EventHubProducerClient.from_connection_string(connection_str)
try:
with pytest.raises(ValueError):
data = EventData(None)
batch = await client.create_batch()
batch.add(data)
await client.send_batch(batch)
finally:
await client.close()
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_create_batch_with_invalid_hostname_async(invalid_hostname):
if sys.platform.startswith('darwin'):
pytest.skip("Skipping on OSX - it keeps reporting 'Unable to set external certificates' "
"and blocking other tests")
client = EventHubProducerClient.from_connection_string(invalid_hostname)
async with client:
with pytest.raises(ConnectError):
await client.create_batch(max_size_in_bytes=300)
@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_create_batch_with_too_large_size_async(connection_str):
client = EventHubProducerClient.from_connection_string(connection_str)
async with client:
with pytest.raises(ValueError):
await client.create_batch(max_size_in_bytes=5 * 1024 * 1024)
|