1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
#!/usr/bin/env python
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
"""
Example to demonstrate utilizing SAS (Shared Access Signature) tokens to authenticate with Event Hubs.
"""
import os
import time
import hmac
import hashlib
import base64
from urllib.parse import quote as url_parse_quote
from azure.core.credentials import AccessToken
from azure.eventhub import EventHubProducerClient, EventData
def generate_sas_token(uri, sas_name, sas_value, token_ttl):
"""Performs the signing and encoding needed to generate a sas token from a sas key."""
sas = sas_value.encode("utf-8")
expiry = str(int(time.time() + token_ttl))
string_to_sign = (uri + "\n" + expiry).encode("utf-8")
signed_hmac_sha256 = hmac.HMAC(sas, string_to_sign, hashlib.sha256)
signature = url_parse_quote(base64.b64encode(signed_hmac_sha256.digest()))
return "SharedAccessSignature sr={}&sig={}&se={}&skn={}".format(uri, signature, expiry, sas_name)
class CustomizedSASCredential:
def __init__(self, token, expiry):
"""
:param str token: The token string
:param float expiry: The epoch timestamp
"""
self.token = token
self.expiry = expiry
self.token_type = b"servicebus.windows.net:sastoken"
def get_token(self, *scopes, **kwargs):
"""
This method is automatically called when token is about to expire.
"""
return AccessToken(self.token, self.expiry)
# Target namespace and hub must also be specified. Consumer group is set to default unless required otherwise.
FULLY_QUALIFIED_NAMESPACE = os.environ["EVENT_HUB_HOSTNAME"]
EVENTHUB_NAME = os.environ["EVENT_HUB_NAME"]
# The following part creates a SAS token. Users can use any way to create a SAS token.
SAS_POLICY = os.environ["EVENT_HUB_SAS_POLICY"]
SAS_KEY = os.environ["EVENT_HUB_SAS_KEY"]
uri = "sb://{}/{}".format(FULLY_QUALIFIED_NAMESPACE, EVENTHUB_NAME)
token_ttl = 3000 # seconds
sas_token = generate_sas_token(uri, SAS_POLICY, SAS_KEY, token_ttl)
# end of creating a SAS token
producer_client = EventHubProducerClient(
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENTHUB_NAME,
credential=CustomizedSASCredential(sas_token, time.time() + token_ttl),
logging_enable=True,
)
start_time = time.time()
with producer_client:
event_data_batch = producer_client.create_batch()
event_data_batch.add(EventData("Single message"))
producer_client.send_batch(event_data_batch)
print("Send messages in {} seconds.".format(time.time() - start_time))
|