File: test_faultinjector.py

package info (click to toggle)
python-azure 20251014%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 766,472 kB
  • sloc: python: 6,314,744; ansic: 804; javascript: 287; makefile: 198; sh: 198; xml: 109
file content (75 lines) | stat: -rw-r--r-- 2,627 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import logging
import ssl
import threading
import pytest

from azure.eventhub import EventData
from azure.eventhub import EventHubConsumerClient


@pytest.mark.liveTest
@pytest.mark.parametrize(
    "faultinjector",
    [
        {
            "faultinjector_args": [
                "detach_after_delay",
                # You can pass any condition here. The default is amqp:link:detach-forced
                # "--cond", "amqp:link:ohno",
                "--desc",
                "DETACHED FOR FAULT INJECTOR TEST",
            ]
        }
    ],
    indirect=True,
)
def test_receive_partition_using_fault_injector_detach_after_delay(
    auth_credential_senders, faultinjector
):
    client_args = faultinjector
    assert client_args, "client_args is empty, fault injector wasn't started"

    fully_qualified_namespace, eventhub_name, credential, senders = (
        auth_credential_senders
    )

    senders[0].send(EventData("Test EventData"))

    # Uncomment to enable DEBUG logging to get more information on frames so you can actually see the DETACH message we're using.
    # You'll see lines like this one (where I customized the AMQP status code as well as the message):
    # 2025-05-27 19:35:21,448 azure.eventhub._pyamqp.link DEBUG    <- DetachFrame(handle=1, closed=True, error=[b'amqp:link:ohno', b'DETACHED FOR FAULT INJECTOR TEST'])
    # logger = logging.getLogger("azure.eventhub")
    # logger.setLevel(logging.DEBUG)
    # client_args["logging_enable"] = True

    client = EventHubConsumerClient(
        fully_qualified_namespace=fully_qualified_namespace,
        eventhub_name=eventhub_name,
        credential=credential(),
        consumer_group="$default",
        **client_args,
    )

    def on_event(partition_context, _):
        on_event.received += 1
        on_event.partition_id = partition_context.partition_id
        on_event.consumer_group = partition_context.consumer_group
        on_event.fully_qualified_namespace = partition_context.fully_qualified_namespace
        on_event.eventhub_name = partition_context.eventhub_name

    on_event.received = 0

    with client:
        worker = threading.Thread(
            target=client.receive,
            args=(on_event,),
            kwargs={"starting_position": "-1", "partition_id": "0"},
        )
        worker.start()
        worker.join(timeout=10)

        assert on_event.received == 1
        assert on_event.partition_id == "0"
        assert on_event.consumer_group == "$default"
        assert on_event.fully_qualified_namespace == fully_qualified_namespace
        assert on_event.eventhub_name == eventhub_name