File: test_auth_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (101 lines) | stat: -rw-r--r-- 5,142 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import pytest
import asyncio
import datetime
import time

from azure.identity.aio import EnvironmentCredential
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient, EventHubSharedKeyCredential
from azure.eventhub.aio._client_base_async import EventHubSASTokenCredential

from devtools_testutils import AzureMgmtTestCase, CachedResourceGroupPreparer
from tests.eventhub_preparer import (
    CachedEventHubNamespacePreparer, 
    CachedEventHubPreparer
)

@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_client_secret_credential_async(live_eventhub):
    credential = EnvironmentCredential()
    producer_client = EventHubProducerClient(fully_qualified_namespace=live_eventhub['hostname'],
                                             eventhub_name=live_eventhub['event_hub'],
                                             credential=credential,
                                             user_agent='customized information')
    consumer_client = EventHubConsumerClient(fully_qualified_namespace=live_eventhub['hostname'],
                                             eventhub_name=live_eventhub['event_hub'],
                                             consumer_group='$default',
                                             credential=credential,
                                             user_agent='customized information')

    async with producer_client:
        batch = await producer_client.create_batch(partition_id='0')
        batch.add(EventData(body='A single message'))
        await producer_client.send_batch(batch)

    def on_event(partition_context, event):
        on_event.called = True
        on_event.partition_id = partition_context.partition_id
        on_event.event = event
    on_event.called = False
    async with consumer_client:
        task = asyncio.ensure_future(consumer_client.receive(on_event, partition_id='0', starting_position='-1'))
        await asyncio.sleep(13)
    await task
    assert on_event.called is True
    assert on_event.partition_id == "0"
    assert list(on_event.event.body)[0] == 'A single message'.encode('utf-8')


class AsyncEventHubAuthTests(AzureMgmtTestCase):

    @pytest.mark.liveTest
    @pytest.mark.live_test_only
    @CachedResourceGroupPreparer(name_prefix='eventhubtest')
    @CachedEventHubNamespacePreparer(name_prefix='eventhubtest')
    @CachedEventHubPreparer(name_prefix='eventhubtest')
    async def test_client_sas_credential_async(self,
                                   eventhub,
                                   eventhub_namespace,
                                   eventhub_namespace_key_name,
                                   eventhub_namespace_primary_key,
                                   eventhub_namespace_connection_string,
                                   **kwargs):
        # This should "just work" to validate known-good.
        hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
        producer_client = EventHubProducerClient.from_connection_string(eventhub_namespace_connection_string, eventhub_name = eventhub.name)

        async with producer_client:
            batch = await producer_client.create_batch(partition_id='0')
            batch.add(EventData(body='A single message'))
            await producer_client.send_batch(batch)

        # This should also work, but now using SAS tokens.
        credential = EventHubSharedKeyCredential(eventhub_namespace_key_name, eventhub_namespace_primary_key)
        hostname = "{}.servicebus.windows.net".format(eventhub_namespace.name)
        auth_uri = "sb://{}/{}".format(hostname, eventhub.name)
        token = (await credential.get_token(auth_uri)).token
        producer_client = EventHubProducerClient(fully_qualified_namespace=hostname,
                                                 eventhub_name=eventhub.name,
                                                 credential=EventHubSASTokenCredential(token, time.time() + 3000))

        async with producer_client:
            batch = await producer_client.create_batch(partition_id='0')
            batch.add(EventData(body='A single message'))
            await producer_client.send_batch(batch)

        # Finally let's do it with SAS token + conn str
        token_conn_str = "Endpoint=sb://{}/;SharedAccessSignature={};".format(hostname, token.decode())
        conn_str_producer_client = EventHubProducerClient.from_connection_string(token_conn_str,
                                                                                 eventhub_name=eventhub.name)

        async with conn_str_producer_client:
            batch = await conn_str_producer_client.create_batch(partition_id='0')
            batch.add(EventData(body='A single message'))
            await conn_str_producer_client.send_batch(batch)