File: test_reconnect_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (120 lines) | stat: -rw-r--r-- 5,049 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------


import asyncio
import pytest
import time

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient, EventHubSharedKeyCredential
from azure.eventhub.exceptions import OperationTimeoutError

import uamqp
from uamqp import authentication, errors, c_uamqp, compat


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_with_long_interval_async(live_eventhub, sleep):
    test_partition = "0"
    sender = EventHubProducerClient(live_eventhub['hostname'], live_eventhub['event_hub'],
                                    EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key']))
    async with sender:
        batch = await sender.create_batch(partition_id=test_partition)
        batch.add(EventData(b"A single event"))
        await sender.send_batch(batch)

        if sleep:
            await asyncio.sleep(250)  # EH server side idle timeout is 240 second
        else:
            await sender._producers[test_partition]._handler._connection._conn.destroy()
        batch = await sender.create_batch(partition_id=test_partition)
        batch.add(EventData(b"A single event"))
        await sender.send_batch(batch)

    received = []
    uri = "sb://{}/{}".format(live_eventhub['hostname'], live_eventhub['event_hub'])
    sas_auth = authentication.SASTokenAuth.from_shared_access_key(
        uri, live_eventhub['key_name'], live_eventhub['access_key'])

    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub['hostname'],
        live_eventhub['event_hub'],
        live_eventhub['consumer_group'],
        test_partition)
    receiver = uamqp.ReceiveClient(source, auth=sas_auth, debug=False, timeout=10000, prefetch=10)
    try:
        receiver.open()

        # receive_message_batch() returns immediately once it receives any messages before the max_batch_size
        # and timeout reach. Could be 1, 2, or any number between 1 and max_batch_size.
        # So call it twice to ensure the two events are received.
        received.extend([EventData._from_message(x) for x in receiver.receive_message_batch(max_batch_size=1, timeout=5000)])
        received.extend([EventData._from_message(x) for x in receiver.receive_message_batch(max_batch_size=1, timeout=5000)])
    finally:
        receiver.close()

    assert len(received) == 2
    assert list(received[0].body)[0] == b"A single event"


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_send_connection_idle_timeout_and_reconnect_async(connstr_receivers):
    connection_str, receivers = connstr_receivers
    client = EventHubProducerClient.from_connection_string(conn_str=connection_str, idle_timeout=10)
    async with client:
        ed = EventData('data')
        sender = client._create_producer(partition_id='0')
        async with sender:
            await sender._open_with_retry()
            time.sleep(11)
            sender._unsent_events = [ed.message]
            ed.message.on_send_complete = sender._on_outcome
            with pytest.raises((uamqp.errors.ConnectionClose,
                                uamqp.errors.MessageHandlerError, OperationTimeoutError)):
                # Mac may raise OperationTimeoutError or MessageHandlerError
                await sender._send_event_data()
            await sender._send_event_data_with_retry()
    retry = 0
    while retry < 3:
        try:
            messages = receivers[0].receive_message_batch(max_batch_size=10, timeout=10000)
            if messages:
                received_ed1 = EventData._from_message(messages[0])
                assert received_ed1.body_as_str() == 'data'
                break
        except compat.TimeoutException:
            retry += 1


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_receive_connection_idle_timeout_and_reconnect_async(connstr_senders):
    connection_str, senders = connstr_senders
    client = EventHubConsumerClient.from_connection_string(
        conn_str=connection_str,
        consumer_group='$default',
        idle_timeout=10
    )
    async def on_event_received(event):
        on_event_received.event = event

    async with client:
        consumer = client._create_consumer("$default", "0", "-1", on_event_received)
        async with consumer:
            await consumer._open_with_retry()

            time.sleep(11)

            ed = EventData("Event")
            senders[0].send(ed)

            await consumer._handler.do_work_async()
            assert consumer._handler._connection._state == c_uamqp.ConnectionState.DISCARDING
            await consumer.receive(batch=False, max_batch_size=1, max_wait_time=10)
            assert on_event_received.event.body_as_str() == "Event"