File: test_connection_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (58 lines) | stat: -rw-r--r-- 2,701 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import sys

if sys.version_info >= (3, 8):
    from unittest.mock import AsyncMock, Mock

import pytest

from azure.eventhub._pyamqp.aio import Connection


@pytest.mark.asyncio
async def test_connection_begin_session():
    if sys.version_info < (3, 8):
        pytest.skip("AsyncMock is not available in Python 3.7")
    connection = Connection("sb://fake.host.com")
    connection._process_outgoing_frame = AsyncMock(return_value=None)
    # create session on the Connection
    session = connection.create_session(network_trace=False)
    outgoing_channel = session.channel
    # mock starting a begin session to the server
    session.begin = Mock(return_value=None)
    session.begin()
    # in response from the server we should get back a BEGIN frame
    incoming_channel = 0
    incoming_frame = (1, 0, 0, 0, 0, 0, 0, 0)
    connection.listen = AsyncMock(side_effect=await connection._incoming_begin(incoming_channel, incoming_frame))
    await connection.listen()
    assert incoming_channel in connection._incoming_endpoints
    assert outgoing_channel in connection._outgoing_endpoints
    assert outgoing_channel == connection._incoming_endpoints[incoming_channel].channel
    assert connection._incoming_endpoints[incoming_channel] == connection._outgoing_endpoints[outgoing_channel]


@pytest.mark.asyncio
async def test_connection_end_session_on_timeout():
    if sys.version_info < (3, 8):
        pytest.skip("AsyncMock is not available in Python 3.7")
    connection = Connection("sb://fake.host.com")
    connection._process_outgoing_frame = AsyncMock(return_value=None)
    # create session on the Connection
    session = connection.create_session(network_trace=False)
    outgoing_channel = session.channel
    # mock starting a begin session to the server
    session.begin = Mock(return_value=None)
    session.begin()
    # in response from the server we should get back a BEGIN frame
    incoming_channel = 0
    incoming_frame = (1, 0, 0, 0, 0, 0, 0, 0)
    connection.listen = AsyncMock(side_effect=await connection._incoming_begin(incoming_channel, incoming_frame))
    await connection.listen()
    assert outgoing_channel == connection._incoming_endpoints[incoming_channel].channel
    # typically after some inactivity(60,000 ms) on the connection, the server will force close a connection
    # and will send across an END frame
    connection.listen = AsyncMock(side_effect=await connection._incoming_end(incoming_channel, None))
    await connection.listen()
    # end points dont have the channels anymore and Session link is gone.
    assert outgoing_channel not in connection._outgoing_endpoints
    assert incoming_channel not in connection._incoming_endpoints