1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import time
import pytest
from azure.eventhub import (
EventData,
EventHubSharedKeyCredential,
EventHubProducerClient,
EventHubConsumerClient,
)
from azure.eventhub.exceptions import OperationTimeoutError
from azure.eventhub._utils import transform_outbound_single_message
from azure.eventhub._pyamqp.authentication import SASTokenAuth
from azure.eventhub._pyamqp import ReceiveClient, error, constants
from azure.eventhub._transport._pyamqp_transport import PyamqpTransport
try:
import uamqp
from uamqp import compat
from azure.eventhub._transport._uamqp_transport import UamqpTransport
except (ModuleNotFoundError, ImportError):
UamqpTransport = None
@pytest.mark.liveTest
def test_send_with_long_interval_sync(live_eventhub, sleep, uamqp_transport, timeout_factor, client_args):
test_partition = "0"
sender = EventHubProducerClient(
live_eventhub["hostname"],
live_eventhub["event_hub"],
EventHubSharedKeyCredential(live_eventhub["key_name"], live_eventhub["access_key"]),
uamqp_transport=uamqp_transport,
**client_args
)
with sender:
batch = sender.create_batch(partition_id=test_partition)
batch.add(EventData(b"A single event"))
sender.send_batch(batch)
if sleep:
time.sleep(250)
else:
if uamqp_transport:
sender._producers[test_partition]._handler._connection._conn.destroy()
else:
sender._producers[test_partition]._handler._connection.close()
batch = sender.create_batch(partition_id=test_partition)
batch.add(EventData(b"A single event"))
sender.send_batch(batch)
received = []
uri = "sb://{}/{}".format(live_eventhub["hostname"], live_eventhub["event_hub"])
source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
live_eventhub["hostname"],
live_eventhub["event_hub"],
live_eventhub["consumer_group"],
test_partition,
)
if uamqp_transport:
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
uri, live_eventhub["key_name"], live_eventhub["access_key"]
)
receiver = uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=5000, prefetch=500)
else:
sas_auth = SASTokenAuth(uri, uri, live_eventhub["key_name"], live_eventhub["access_key"])
receiver = ReceiveClient(
live_eventhub["hostname"],
source,
auth=sas_auth,
debug=False,
link_credit=500,
)
try:
receiver.open()
# receive_message_batch() returns immediately once it receives any messages before the max_batch_size
# and timeout reach. Could be 1, 2, or any number between 1 and max_batch_size.
# So call it twice to ensure the two events are received.
received.extend(
[
EventData._from_message(x)
for x in receiver.receive_message_batch(max_batch_size=1, timeout=5 * timeout_factor)
]
)
received.extend(
[
EventData._from_message(x)
for x in receiver.receive_message_batch(max_batch_size=1, timeout=5 * timeout_factor)
]
)
finally:
receiver.close()
assert len(received) == 2
assert list(received[0].body)[0] == b"A single event"
@pytest.mark.liveTest
def test_send_connection_idle_timeout_and_reconnect_sync(auth_credential_receivers, uamqp_transport, timeout_factor, client_args):
fully_qualified_namespace, eventhub_name, credential, receivers = auth_credential_receivers
if uamqp_transport:
amqp_transport = UamqpTransport
retry_total = 3
timeout_exc = compat.TimeoutException
else:
amqp_transport = PyamqpTransport
retry_total = 0
timeout_exc = TimeoutError
client = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential(),
idle_timeout=10,
retry_total=retry_total,
uamqp_transport=uamqp_transport,
**client_args
)
with client:
ed = EventData("data")
sender = client._create_producer(partition_id="0")
with sender:
sender._open_with_retry()
time.sleep(11)
ed = transform_outbound_single_message(ed, EventData, amqp_transport.to_outgoing_amqp_message)
sender._unsent_events = [ed._message]
if uamqp_transport:
sender._unsent_events[0].on_send_complete = sender._on_outcome
with pytest.raises(
(
uamqp.errors.ConnectionClose,
uamqp.errors.MessageHandlerError,
OperationTimeoutError,
)
):
sender._send_event_data()
else:
with pytest.raises(error.AMQPConnectionError):
sender._send_event_data()
if uamqp_transport:
sender._send_event_data_with_retry()
if not uamqp_transport:
client = EventHubProducerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential(),
idle_timeout=10,
uamqp_transport=uamqp_transport,
**client_args
)
with client:
ed = EventData("data")
sender = client._create_producer(partition_id="0")
with sender:
sender._open_with_retry()
time.sleep(11)
ed = transform_outbound_single_message(ed, EventData, amqp_transport.to_outgoing_amqp_message)
sender._unsent_events = [ed._message]
with pytest.raises(error.AMQPConnectionError):
sender._send_event_data()
@pytest.mark.liveTest
def test_receive_connection_idle_timeout_and_reconnect_sync(auth_credential_senders, uamqp_transport, client_args):
fully_qualified_namespace, eventhub_name, credential, senders = auth_credential_senders
client = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential(),
consumer_group="$default",
idle_timeout=10,
uamqp_transport=uamqp_transport,
**client_args
)
def on_event_received(event):
on_event_received.event = event
with client:
consumer = client._create_consumer("$default", "0", "-1", on_event_received)
with consumer:
while not consumer.handler_ready:
consumer._open()
time.sleep(11)
ed = EventData("Event")
senders[0].send(ed)
if uamqp_transport:
consumer._handler.do_work()
assert consumer._handler._connection._state == uamqp.c_uamqp.ConnectionState.DISCARDING
else:
with pytest.raises(error.AMQPConnectionError):
consumer._handler.do_work()
assert consumer._handler._connection.state == constants.ConnectionState.END
duration = 10
now_time = time.time()
end_time = now_time + duration
while now_time < end_time:
consumer.receive()
time.sleep(0.01)
now_time = time.time()
assert on_event_received.event.body_as_str() == "Event"
|