File: test_azure_event_hubs_mgmt_op_async.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (45 lines) | stat: -rw-r--r-- 1,714 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import asyncio
import pytest
import sys

import uamqp
from uamqp import authentication


def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger


log = get_logger(logging.INFO)


@pytest.mark.asyncio
async def test_event_hubs_mgmt_op_async(live_eventhub_config):

    plain_auth = authentication.SASLPlain(live_eventhub_config['hostname'], live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    async with uamqp.AMQPClientAsync(target, auth=plain_auth, debug=False) as send_client:
        mgmt_msg = uamqp.Message(application_properties={'name': live_eventhub_config['event_hub']})
        response = await send_client.mgmt_request_async(
            mgmt_msg,
            b'READ',
            op_type=b'com.microsoft:eventhub',
            status_code_field=b'status-code',
            description_fields=b'status-description',
            timeout=3000)
        output = response.get_data()
        assert output[b'partition_ids'] == [b"0", b"1"]