1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import pytest
import time
from azure.identity import EnvironmentCredential, DefaultAzureCredential
from azure.eventhub import EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub._client_base import EventHubSASTokenCredential
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@pytest.mark.livetest
def test_mgmt_call_conn_str(connstr_receivers):
connection_str, receivers = connstr_receivers
client = EventHubProducerClient.from_connection_string(connection_str)
client._start_producer("0", 60)
@pytest.mark.livetest
def test_mgmt_call_token_credential(live_eventhub, get_credential):
credential = get_credential()
client = EventHubProducerClient(
fully_qualified_namespace=live_eventhub["hostname"],
eventhub_name=live_eventhub["event_hub"],
credential=credential,
user_agent="customized information",
)
client._start_producer("0", 60)
@pytest.mark.livetest
def test_mgmt_call_sas(live_eventhub):
hostname = live_eventhub["hostname"]
credential = EventHubSharedKeyCredential(live_eventhub["key_name"], live_eventhub["access_key"])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub["event_hub"])
token = credential.get_token(auth_uri).token
client = EventHubProducerClient(
fully_qualified_namespace=hostname,
eventhub_name=live_eventhub["event_hub"],
credential=EventHubSASTokenCredential(token, time.time() + 3000),
)
client._start_producer("0", 60)
assert True
@pytest.mark.livetest
def test_mgmt_call_sas_credential(live_eventhub):
hostname = live_eventhub["hostname"]
credential = EventHubSharedKeyCredential(live_eventhub["key_name"], live_eventhub["access_key"])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub["event_hub"])
token = credential.get_token(auth_uri).token
client = EventHubProducerClient(
fully_qualified_namespace=hostname,
eventhub_name=live_eventhub["event_hub"],
credential=AzureSasCredential(token),
)
client._start_producer("0", 60)
assert True
@pytest.mark.livetest
def test_mgmt_call_azure_named_key_credential(live_eventhub):
credential = AzureNamedKeyCredential(live_eventhub["key_name"], live_eventhub["access_key"])
client = EventHubProducerClient(
fully_qualified_namespace=live_eventhub["hostname"],
eventhub_name=live_eventhub["event_hub"],
credential=credential,
)
client._start_producer("0", 60)
assert True
|