1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: queue_samples_authentication_async.py
DESCRIPTION:
These samples demonstrate authenticating a client via a connection string,
shared access key, token credential from Azure Active Directory, or by
generating a sas token with which the returned signature can be used with
the credential parameter of any QueueServiceClient or QueueClient.
USAGE:
python queue_samples_authentication_async.py
Set the environment variables with your own values before running the sample:
1) STORAGE_CONNECTION_STRING - the connection string to your storage account
2) STORAGE_ACCOUNT_QUEUE_URL - the queue service account URL
3) STORAGE_ACCOUNT_NAME - the name of the storage account
4) STORAGE_ACCOUNT_KEY - the storage account access key
"""
from datetime import datetime, timedelta
import asyncio
import os
import sys
class QueueAuthSamplesAsync(object):
connection_string = os.getenv("STORAGE_CONNECTION_STRING")
account_url = os.getenv("STORAGE_ACCOUNT_QUEUE_URL")
account_name = os.getenv("STORAGE_ACCOUNT_NAME")
access_key = os.getenv("STORAGE_ACCOUNT_KEY")
async def authentication_by_connection_string_async(self):
if self.connection_string is None:
print(
"Missing required environment variable(s). Please see specific test for more details."
+ "\n"
+ "Test: authentication_by_connection_string_async"
)
sys.exit(1)
# Instantiate a QueueServiceClient using a connection string
# [START async_auth_from_connection_string]
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient.from_connection_string(conn_str=self.connection_string)
# [END async_auth_from_connection_string]
# Get information for the Queue Service
async with queue_service:
properties = await queue_service.get_service_properties()
async def authentication_by_shared_key_async(self):
if self.account_url is None or self.access_key is None:
print(
"Missing required environment variable(s). Please see specific test for more details."
+ "\n"
+ "Test: authentication_by_shared_key_async"
)
sys.exit(1)
# Instantiate a QueueServiceClient using a shared access key
# [START async_create_queue_service_client]
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient(account_url=self.account_url, credential=self.access_key)
# [END async_create_queue_service_client]
# Get information for the Queue Service
async with queue_service:
properties = await queue_service.get_service_properties()
async def authentication_by_oauth_async(self):
if self.account_url is None:
print(
"Missing required environment variable(s). Please see specific test for more details."
+ "\n"
+ "Test: authentication_by_oauth"
)
sys.exit(1)
# [START async_create_queue_service_client_oauth]
# Get a token credential for authentication
from azure.identity.aio import DefaultAzureCredential
token_credential = DefaultAzureCredential()
# Instantiate a QueueServiceClient using a token credential
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient(account_url=self.account_url, credential=token_credential)
# [END async_create_queue_service_client_oauth]
# Get information for the Queue Service
async with queue_service:
properties = await queue_service.get_service_properties()
async def authentication_by_shared_access_signature_async(self):
if (
self.connection_string is None
or self.account_name is None
or self.access_key is None
or self.account_url is None
):
print(
"Missing required environment variable(s). Please see specific test for more details."
+ "\n"
+ "Test: authentication_by_shared_access_signature_async"
)
sys.exit(1)
# Instantiate a QueueServiceClient using a connection string
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient.from_connection_string(conn_str=self.connection_string)
# Create a SAS token to use for authentication of a client
from azure.storage.queue import generate_account_sas, ResourceTypes, AccountSasPermissions
sas_token = generate_account_sas(
self.account_name,
self.access_key,
resource_types=ResourceTypes(service=True),
permission=AccountSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=1),
)
token_auth_queue_service = QueueServiceClient(account_url=self.account_url, credential=sas_token)
# Get information for the Queue Service
async with token_auth_queue_service:
properties = await token_auth_queue_service.get_service_properties()
async def main():
sample = QueueAuthSamplesAsync()
await sample.authentication_by_connection_string_async()
await sample.authentication_by_shared_key_async()
await sample.authentication_by_oauth_async()
await sample.authentication_by_shared_access_signature_async()
if __name__ == "__main__":
asyncio.run(main())
|