File: test_azure_event_hubs_mgmt_op.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (30 lines) | stat: -rw-r--r-- 1,307 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import sys

import uamqp
from uamqp import authentication


def test_event_hubs_mgmt_op(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAuth.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

    target = "amqps://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    with uamqp.AMQPClient(target, auth=sas_auth, debug=False) as send_client:
        mgmt_msg = uamqp.Message(application_properties={'name': live_eventhub_config['event_hub']})
        response = send_client.mgmt_request(
            mgmt_msg,
            b'READ',
            op_type=b'com.microsoft:eventhub',
            status_code_field=b'status-code',
            description_fields=b'status-description')
        output = response.get_data()
        assert output[b'partition_ids'] == [b"0", b"1"]