File: test_azure_iothub_send_async.py

package info (click to toggle)
azure-uamqp-python 1.6.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 35,564 kB
  • sloc: ansic: 184,383; cpp: 7,738; python: 7,731; cs: 5,767; sh: 983; xml: 298; makefile: 34
file content (92 lines) | stat: -rw-r--r-- 3,192 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import os
import logging
import asyncio
import pytest
import sys
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
from uuid import uuid4
try:
    from urllib import quote, quote_plus, urlencode #Py2
except Exception:
    from urllib.parse import quote, quote_plus, urlencode

import uamqp


def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger


log = get_logger(logging.INFO)


def _generate_sas_token(uri, policy, key, expiry=None):
    if not expiry:
        expiry = time() + 3600  # Default to 1 hour.
    encoded_uri = quote_plus(uri)
    ttl = int(expiry)
    sign_key = '%s\n%d' % (encoded_uri, ttl)
    signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
    result = {
        'sr': uri,
        'sig': signature,
        'se': str(ttl)}
    if policy:
        result['skn'] = policy
    return 'SharedAccessSignature ' + urlencode(result)


def _build_iothub_amqp_endpoint_from_target(target):
    hub_name = target['hostname'].split('.')[0]
    endpoint = "{}@sas.root.{}".format(target['key_name'], hub_name)
    endpoint = quote_plus(endpoint)
    sas_token = _generate_sas_token(target['hostname'], target['key_name'],
                                    target['access_key'], time() + 360)
    endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['hostname'])
    return endpoint


@pytest.mark.asyncio
async def test_iot_hub_send_async(live_iothub_config):
    msg_content = b"hello world"
    msg_props = uamqp.message.MessageProperties()
    msg_props.to = '/devices/{}/messages/devicebound'.format(live_iothub_config['device'])
    msg_props.message_id = str(uuid4())
    message = uamqp.Message(msg_content, properties=msg_props)

    operation = '/messages/devicebound'
    endpoint = _build_iothub_amqp_endpoint_from_target(live_iothub_config)

    target = 'amqps://' + endpoint + operation
    log.info("Target: {}".format(target))

    send_client = uamqp.SendClientAsync(target, debug=False)
    send_client.queue_message(message)
    results = await send_client.send_all_messages_async()
    assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]


if __name__ == '__main__':
    config = {}
    config['hostname'] = os.environ['IOTHUB_HOSTNAME']
    config['device'] = os.environ['IOTHUB_DEVICE']
    config['key_name'] = os.environ['IOTHUB_SAS_POLICY']
    config['access_key'] = os.environ['IOTHUB_SAS_KEY']

    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_iot_hub_send_async(config))