1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
from unittest.mock import Mock
from azure.eventhub._pyamqp.error import AMQPLinkError
from azure.eventhub._pyamqp.link import Link
from azure.eventhub._pyamqp.receiver import ReceiverLink
from azure.eventhub._pyamqp.constants import LinkState
import pytest
@pytest.mark.parametrize(
"start_state,expected_state",
[
(LinkState.ATTACHED, LinkState.DETACH_SENT),
(LinkState.ATTACH_SENT, LinkState.DETACHED),
(LinkState.ATTACH_RCVD, LinkState.DETACHED),
],
ids=["link attached", "link attach sent", "link attach rcvd"],
)
def test_link_should_detach(start_state, expected_state):
session = Mock()
link = Link(
session,
3,
name="test_link",
role=True,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
)
assert link.state == LinkState.DETACHED
link._set_state(start_state)
link._outgoing_detach = Mock(return_value=None)
link.detach()
assert link.state == expected_state
@pytest.mark.parametrize(
"state",
[LinkState.DETACHED, LinkState.DETACH_SENT, LinkState.ERROR],
ids=["link detached", "link detach sent", "link error"],
)
def test_link_should_not_detach(state):
session = None
link = Link(
session,
3,
name="test_link",
role=True,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
)
assert link.state == LinkState.DETACHED
link._set_state(state)
link._outgoing_detach = Mock(return_value=None)
link.detach()
link._outgoing_detach.assert_not_called()
def test_receive_transfer_frame_multiple():
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)
link.current_link_credit = 2 # Set the link credit to 2
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, bathable, payload
transfer_frame_one = [3, 0, b"/blah", 0, True, True, None, None, None, None, False, ""]
transfer_frame_two = [3, None, b"/blah", 0, True, False, None, None, None, None, False, ""]
link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 2
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 1
def test_receive_transfer_continuation_frame():
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)
link.current_link_credit = 3 # Set the link credit to 2
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
transfer_frame_one = [3, 0, b"/blah", 0, True, False, None, None, None, None, False, ""]
transfer_frame_two = [3, 1, b"/blah", 0, True, True, None, None, None, None, False, ""]
transfer_frame_three = [3, None, b"/blah", 0, True, False, None, None, None, None, False, ""]
link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 2
assert link.delivery_count == 1
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 2
assert link.delivery_count == 1
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == 1
assert link.delivery_count == 2
def test_receive_transfer_and_flow():
def mock_outgoing():
pass
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)
link._outgoing_flow = mock_outgoing
link.flow(link_credit=100) # Send a flow frame with desired link credit of 100
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
transfer_frame_one = [3, 0, b"/blah", 0, True, False, None, None, None, None, False, ""]
transfer_frame_two = [3, 1, b"/blah", 0, True, False, None, None, None, None, False, ""]
transfer_frame_three = [3, 2, b"/blah", 0, True, False, None, None, None, None, False, ""]
link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 99
# Only received 1 transfer frame per receive call, we set desired link credit again
# this will send a flow of 1
link.flow(link_credit=100)
assert link.current_link_credit == 100
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 99
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == 98
@pytest.mark.parametrize(
"frame",
[
[2, True, [b'amqp:link:detach-forced', b"The link is force detached. Code: publisher(link3006875). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00.", None]],
[2, True, [b'amqp:link:detach-forced', None, b'something random']],
[2, True, [b'amqp:link:detach-forced', None, None]],
[2, True, [b'amqp:link:detach-forced']],
],
ids=["description and info", "info only", "description only", "no info or description"],
)
def test_detach_with_error(frame):
'''
A detach can optionally include an description and info field.
https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-error
'''
session = None
link = Link(
session,
3,
name="test_link",
role=True,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
)
link._set_state(LinkState.DETACH_RCVD)
link._incoming_detach(frame)
with pytest.raises(AMQPLinkError) as ae:
link.get_state()
assert ae.description == frame[2][1]
assert ae.info == frame[2][2]
|