1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import pytest
from azure.eventhub._pyamqp import SendClient, Connection, authentication
from azure.eventhub._pyamqp.error import AMQPConnectionError
def test_client_creation_exceptions():
with pytest.raises(TypeError):
sender = SendClient(
"fake.host.com",
)
assert sender._hostname == "fake.host.com"
def test_connection_endpoint_exceptions():
with pytest.raises(AMQPConnectionError):
endpoint = "sb://fake.host.com"
connection = Connection(endpoint)
connection.open()
def test_connection_sas_authentication_exception():
uri = "sb://{}/{}".format("fake.host.come", "fake_eh")
target = "amqps://{}/{}/Partitions/{}".format("fake.host.com", "fake_eh", "0")
sas_auth = authentication.SASTokenAuth(uri=uri, audience=uri, username="key", password="")
with pytest.raises(AttributeError):
sender = SendClient("fake.host.com", target, auth=sas_auth)
sender.client_ready()
def test_connection_sasl_annon_authentication_exception():
target = "amqps://{}/{}/Partitions/{}".format("fake.host.com", "fake_eh", "0")
sas_auth = authentication.SASLAnonymousCredential()
with pytest.raises(AttributeError):
sender = SendClient("fake.host.com", target, auth=sas_auth)
sender.client_ready()
|