File: test_client_creation_async.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (126 lines) | stat: -rw-r--r-- 4,253 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.eventhub import TransportType


def test_custom_endpoint_async():
    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
    )
    assert not producer._config.custom_endpoint_hostname
    assert producer._config.transport_type == TransportType.Amqp
    assert producer._config.connection_port == 5671

    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
        custom_endpoint_address="https://12.34.56.78"
    )
    assert producer._config.custom_endpoint_hostname == '12.34.56.78'
    assert producer._config.transport_type == TransportType.AmqpOverWebsocket
    assert producer._config.connection_port == 443

    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
        custom_endpoint_address="sb://fake.endpoint.com:443"
    )
    assert producer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert producer._config.transport_type == TransportType.AmqpOverWebsocket
    assert producer._config.connection_port == 443

    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
        custom_endpoint_address="https://fake.endpoint.com:200"
    )
    assert producer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert producer._config.transport_type == TransportType.AmqpOverWebsocket
    assert producer._config.connection_port == 200

    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
        custom_endpoint_address="fake.endpoint.com:200"
    )
    assert producer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert producer._config.transport_type == TransportType.AmqpOverWebsocket
    assert producer._config.connection_port == 200

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
    )
    assert not consumer._config.custom_endpoint_hostname
    assert consumer._config.transport_type == TransportType.Amqp
    assert consumer._config.connection_port == 5671

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
        custom_endpoint_address="https://12.34.56.78/"
    )
    assert consumer._config.custom_endpoint_hostname == '12.34.56.78'
    assert consumer._config.transport_type == TransportType.AmqpOverWebsocket
    assert consumer._config.connection_port == 443

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
        custom_endpoint_address="sb://fake.endpoint.com:443"
    )
    assert consumer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert consumer._config.transport_type == TransportType.AmqpOverWebsocket
    assert consumer._config.connection_port == 443

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
        custom_endpoint_address="https://fake.endpoint.com:200"
    )
    assert consumer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert consumer._config.transport_type == TransportType.AmqpOverWebsocket
    assert consumer._config.connection_port == 200

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
        custom_endpoint_address="fake.endpoint.com:200"
    )
    assert consumer._config.custom_endpoint_hostname == 'fake.endpoint.com'
    assert consumer._config.transport_type == TransportType.AmqpOverWebsocket
    assert consumer._config.connection_port == 200


def test_custom_certificate_async():
    producer = EventHubProducerClient(
        "fake.host.com",
        "fake_eh",
        None,
        connection_verify='/usr/bin/local/cert'
    )
    assert producer._config.connection_verify == '/usr/bin/local/cert'

    consumer = EventHubConsumerClient(
        "fake.host.com",
        "fake_eh",
        "fake_group",
        None,
        connection_verify='D:/local/certfile'
    )
    assert consumer._config.connection_verify == 'D:/local/certfile'