File: test_azure_iothub_receive2.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (104 lines) | stat: -rw-r--r-- 3,647 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import sys
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
try:
    from urllib import quote, quote_plus, urlencode #Py2
except Exception:
    from urllib.parse import quote, quote_plus, urlencode

import uamqp
from uamqp import utils, errors
from uamqp import authentication


def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger


log = get_logger(logging.INFO)


def _generate_sas_token(uri, policy, key, expiry=None):
    if not expiry:
        expiry = time() + 3600  # Default to 1 hour.
    encoded_uri = quote_plus(uri)
    ttl = int(expiry)
    sign_key = '%s\n%d' % (encoded_uri, ttl)
    signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
    result = {
        'sr': uri,
        'sig': signature,
        'se': str(ttl)}
    if policy:
        result['skn'] = policy
    return 'SharedAccessSignature ' + urlencode(result)


def _build_iothub_amqp_endpoint_from_target(target):
    hub_name = target['hostname'].split('.')[0]
    username = "{}@sas.root.{}".format(target['key_name'], hub_name)
    sas_token = _generate_sas_token(target['hostname'], target['key_name'],
                                    target['access_key'], time() + 360)
    return username, sas_token


def _receive_message(conn, source, auth):
    batch = []
    receive_client = uamqp.ReceiveClient(source, auth=auth, debug=False, timeout=5000, prefetch=50)
    try:
        receive_client.open(connection=conn)
        batch = receive_client.receive_message_batch(max_batch_size=10)
    except errors.LinkRedirect as redirect:
        return redirect
    finally:
        receive_client.close()
    return batch


def test_iothub_client_receive_sync(live_iothub_config):
    operation = '/messages/events/ConsumerGroups/{}/Partitions/{}'.format(
        live_iothub_config['consumer_group'],
        live_iothub_config['partition'])
    auth = authentication.SASLPlain(
        live_iothub_config['hostname'],
        *_build_iothub_amqp_endpoint_from_target(live_iothub_config))

    source = 'amqps://' + live_iothub_config['hostname'] + operation
    log.info("Source: {}".format(source))
    with uamqp.Connection(live_iothub_config['hostname'], auth, debug=False) as conn:
        result = _receive_message(conn, source, auth)
        new_auth = authentication.SASLPlain(
           result.hostname,
           live_iothub_config['key_name'],
           live_iothub_config['access_key'])
        conn.redirect(result, new_auth)
        result = _receive_message(conn, result.address, new_auth)
    log.info("Finished receiving")


if __name__ == '__main__':
    config = {}
    config['hostname'] = os.environ['IOTHUB_HOSTNAME']
    config['device'] = os.environ['IOTHUB_DEVICE']
    config['key_name'] = os.environ['IOTHUB_SAS_POLICY']
    config['access_key'] = os.environ['IOTHUB_SAS_KEY']
    config['consumer_group'] = "$Default"
    config['partition'] = "0"

    test_iothub_client_receive_sync(config)