1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import pytest
import time
import threading
from azure.identity import EnvironmentCredential
from azure.eventhub import EventData, EventHubProducerClient, EventHubConsumerClient, EventHubSharedKeyCredential
from azure.eventhub._client_base import EventHubSASTokenCredential
from azure.core.credentials import AzureSasCredential, AzureNamedKeyCredential
@pytest.mark.liveTest
def test_client_secret_credential(live_eventhub, uamqp_transport):
credential = EnvironmentCredential()
producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
credential=credential,
user_agent='customized information',
uamqp_transport=uamqp_transport)
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information',
uamqp_transport=uamqp_transport
)
with producer_client:
batch = producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)
def on_event(partition_context, event):
on_event.called = True
on_event.partition_id = partition_context.partition_id
on_event.event = event
on_event.called = False
with consumer_client:
worker = threading.Thread(target=consumer_client.receive, args=(on_event,),
kwargs={
"partition_id": '0',
"starting_position": '-1'
})
worker.start()
time.sleep(13)
worker.join()
assert on_event.called is True
assert on_event.partition_id == "0"
assert list(on_event.event.body)[0] == 'A single message'.encode('utf-8')
@pytest.mark.liveTest
def test_client_sas_credential(live_eventhub, uamqp_transport):
# This should "just work" to validate known-good.
hostname = live_eventhub['hostname']
producer_client = EventHubProducerClient.from_connection_string(
live_eventhub['connection_str'], eventhub_name = live_eventhub['event_hub'], uamqp_transport=uamqp_transport
)
with producer_client:
batch = producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)
# This should also work, but now using SAS tokens.
credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = credential.get_token(auth_uri).token
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=EventHubSASTokenCredential(token, time.time() + 3000),
uamqp_transport=uamqp_transport)
with producer_client:
batch = producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)
# Finally let's do it with SAS token + conn str
token_conn_str = "Endpoint=sb://{}/;SharedAccessSignature={};".format(hostname, token.decode())
conn_str_producer_client = EventHubProducerClient.from_connection_string(token_conn_str,
eventhub_name=live_eventhub['event_hub'],
uamqp_transport=uamqp_transport)
with conn_str_producer_client:
batch = conn_str_producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
conn_str_producer_client.send_batch(batch)
@pytest.mark.liveTest
def test_client_azure_sas_credential(live_eventhub, uamqp_transport):
# This should "just work" to validate known-good.
hostname = live_eventhub['hostname']
producer_client = EventHubProducerClient.from_connection_string(
live_eventhub['connection_str'], eventhub_name = live_eventhub['event_hub'], uamqp_transport=uamqp_transport
)
with producer_client:
batch = producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)
# This should also work, but now using SAS tokens.
credential = EventHubSharedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
auth_uri = "sb://{}/{}".format(hostname, live_eventhub['event_hub'])
token = credential.get_token(auth_uri).token.decode()
producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
eventhub_name=live_eventhub['event_hub'],
credential=AzureSasCredential(token),
auth_timeout=3,
uamqp_transport=uamqp_transport)
with producer_client:
batch = producer_client.create_batch(partition_id='0')
batch.add(EventData(body='A single message'))
producer_client.send_batch(batch)
@pytest.mark.liveTest
def test_client_azure_named_key_credential(live_eventhub, uamqp_transport):
credential = AzureNamedKeyCredential(live_eventhub['key_name'], live_eventhub['access_key'])
consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
eventhub_name=live_eventhub['event_hub'],
consumer_group='$default',
credential=credential,
user_agent='customized information',
auth_timeout=3,
uamqp_transport=uamqp_transport)
assert consumer_client.get_eventhub_properties() is not None
credential.update("foo", "bar")
with pytest.raises(Exception):
consumer_client.get_eventhub_properties()
credential.update(live_eventhub['key_name'], live_eventhub['access_key'])
assert consumer_client.get_eventhub_properties() is not None
|