1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import os
import logging
import asyncio
import pytest
import sys
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
from uuid import uuid4
try:
from urllib import quote, quote_plus, urlencode #Py2
except Exception:
from urllib.parse import quote, quote_plus, urlencode
import uamqp
def get_logger(level):
uamqp_logger = logging.getLogger("uamqp")
if not uamqp_logger.handlers:
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
uamqp_logger.addHandler(handler)
uamqp_logger.setLevel(level)
return uamqp_logger
log = get_logger(logging.INFO)
def _generate_sas_token(uri, policy, key, expiry=None):
if not expiry:
expiry = time() + 3600 # Default to 1 hour.
encoded_uri = quote_plus(uri)
ttl = int(expiry)
sign_key = '%s\n%d' % (encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': uri,
'sig': signature,
'se': str(ttl)}
if policy:
result['skn'] = policy
return 'SharedAccessSignature ' + urlencode(result)
def _build_iothub_amqp_endpoint_from_target(target):
hub_name = target['hostname'].split('.')[0]
endpoint = "{}@sas.root.{}".format(target['key_name'], hub_name)
endpoint = quote_plus(endpoint)
sas_token = _generate_sas_token(target['hostname'], target['key_name'],
target['access_key'], time() + 360)
endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['hostname'])
return endpoint
@pytest.mark.asyncio
async def test_iot_hub_send_async(live_iothub_config):
msg_content = b"hello world"
msg_props = uamqp.message.MessageProperties()
msg_props.to = '/devices/{}/messages/devicebound'.format(live_iothub_config['device'])
msg_props.message_id = str(uuid4())
message = uamqp.Message(msg_content, properties=msg_props)
operation = '/messages/devicebound'
endpoint = _build_iothub_amqp_endpoint_from_target(live_iothub_config)
target = 'amqps://' + endpoint + operation
log.info("Target: {}".format(target))
send_client = uamqp.SendClientAsync(target, debug=False)
send_client.queue_message(message)
results = await send_client.send_all_messages_async()
assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
if __name__ == '__main__':
config = {}
config['hostname'] = os.environ['IOTHUB_HOSTNAME']
config['device'] = os.environ['IOTHUB_DEVICE']
config['key_name'] = os.environ['IOTHUB_SAS_POLICY']
config['access_key'] = os.environ['IOTHUB_SAS_KEY']
loop = asyncio.get_event_loop()
loop.run_until_complete(test_iot_hub_send_async(config))
|