1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import os
import logging
import sys
from base64 import b64encode, b64decode
from hashlib import sha256
from hmac import HMAC
from time import time
try:
from urllib import quote, quote_plus, urlencode #Py2
except Exception:
from urllib.parse import quote, quote_plus, urlencode
import uamqp
from uamqp import utils, errors
from uamqp import authentication
### IotHub CLI Extension SAS token implementation
class SasTokenAuthentication():
"""
Shared Access Signature authorization for Azure IoT Hub.
Args:
uri (str): Uri of target resource.
shared_access_policy_name (str): Name of shared access policy.
shared_access_key (str): Shared access key.
expiry (int): Expiry of the token to be generated. Input should
be seconds since the epoch, in UTC. Default is an hour later from now.
"""
def __init__(self, uri, shared_access_policy_name, shared_access_key, expiry=None):
self.uri = uri
self.policy = shared_access_policy_name
self.key = shared_access_key
if expiry is None:
self.expiry = time() + 3600 # Default expiry is an hour later
else:
self.expiry = expiry
def generate_sas_token(self):
"""
Create a shared access signiture token as a string literal.
Returns:
result (str): SAS token as string literal.
"""
encoded_uri = quote_plus(self.uri)
ttl = int(self.expiry)
sign_key = '%s\n%d' % (encoded_uri, ttl)
signature = b64encode(HMAC(b64decode(self.key), sign_key.encode('utf-8'), sha256).digest())
result = {
'sr': self.uri,
'sig': signature,
'se': str(ttl)
}
if self.policy:
result['skn'] = self.policy
return 'SharedAccessSignature ' + urlencode(result)
def get_logger(level):
uamqp_logger = logging.getLogger("uamqp")
if not uamqp_logger.handlers:
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
uamqp_logger.addHandler(handler)
uamqp_logger.setLevel(level)
return uamqp_logger
log = get_logger(logging.INFO)
def _build_iothub_amqp_endpoint_from_target(target):
hub_name = target['hostname'].split('.')[0]
endpoint = "{}@sas.root.{}".format(target['key_name'], hub_name)
endpoint = quote_plus(endpoint)
sas_token = SasTokenAuthentication(target['hostname'], target['key_name'],
target['access_key'], time() + 360).generate_sas_token()
endpoint = endpoint + ":{}@{}".format(quote_plus(sas_token), target['hostname'])
return endpoint
def _receive_message(receive_client, target):
try:
batch = receive_client.receive_message_batch(max_batch_size=10)
except errors.LinkRedirect as redirect:
new_auth = authentication.SASLPlain(redirect.hostname, target['key_name'], target['access_key'])
#new_auth = authentication.SASTokenAuth.from_shared_access_key(redirect.address.decode('utf-8'), target['key_name'], target['access_key'])
receive_client.redirect(redirect, new_auth)
batch = receive_client.receive_message_batch(max_batch_size=10)
while batch:
log.info("Got batch: {}".format(len(batch)))
assert len(batch) <= 10
for message in batch:
annotations = message.annotations
log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
batch = receive_client.receive_message_batch(max_batch_size=10)
def test_iothub_client_receive_sync(live_iothub_config):
operation = '/messages/events/ConsumerGroups/{}/Partitions/{}'.format(
live_iothub_config['consumer_group'],
live_iothub_config['partition'])
endpoint = _build_iothub_amqp_endpoint_from_target(live_iothub_config)
source = 'amqps://' + endpoint + operation
log.info("Source: {}".format(source))
receive_client = uamqp.ReceiveClient(source, debug=False, timeout=5000, prefetch=50)
try:
log.info("Created client, receiving...")
_receive_message(receive_client, live_iothub_config)
except Exception as e:
print(e)
raise
finally:
receive_client.close()
log.info("Finished receiving")
if __name__ == '__main__':
config = {}
config['hostname'] = os.environ['IOTHUB_HOSTNAME']
config['device'] = os.environ['IOTHUB_DEVICE']
config['key_name'] = os.environ['IOTHUB_SAS_POLICY']
config['access_key'] = os.environ['IOTHUB_SAS_KEY']
config['consumer_group'] = "$Default"
config['partition'] = "0"
test_iothub_client_receive_sync(config)
|