File: queue_samples_authentication_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (150 lines) | stat: -rw-r--r-- 5,959 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: queue_samples_authentication_async.py

DESCRIPTION:
    These samples demonstrate authenticating a client via a connection string,
    shared access key, token credential from Azure Active Directory, or by
    generating a sas token with which the returned signature can be used with
    the credential parameter of any QueueServiceClient or QueueClient.

USAGE:
    python queue_samples_authentication_async.py

    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
    2) STORAGE_ACCOUNT_QUEUE_URL - the queue service account URL
    3) STORAGE_ACCOUNT_NAME - the name of the storage account
    4) STORAGE_ACCOUNT_KEY - the storage account access key
"""


from datetime import datetime, timedelta
import asyncio
import os
import sys


class QueueAuthSamplesAsync(object):

    connection_string = os.getenv("STORAGE_CONNECTION_STRING")
    account_url = os.getenv("STORAGE_ACCOUNT_QUEUE_URL")
    account_name = os.getenv("STORAGE_ACCOUNT_NAME")
    access_key = os.getenv("STORAGE_ACCOUNT_KEY")

    async def authentication_by_connection_string_async(self):
        if self.connection_string is None:
            print(
                "Missing required environment variable(s). Please see specific test for more details."
                + "\n"
                + "Test: authentication_by_connection_string_async"
            )
            sys.exit(1)

        # Instantiate a QueueServiceClient using a connection string
        # [START async_auth_from_connection_string]
        from azure.storage.queue.aio import QueueServiceClient

        queue_service = QueueServiceClient.from_connection_string(conn_str=self.connection_string)
        # [END async_auth_from_connection_string]

        # Get information for the Queue Service
        async with queue_service:
            properties = await queue_service.get_service_properties()

    async def authentication_by_shared_key_async(self):
        if self.account_url is None or self.access_key is None:
            print(
                "Missing required environment variable(s). Please see specific test for more details."
                + "\n"
                + "Test: authentication_by_shared_key_async"
            )
            sys.exit(1)

        # Instantiate a QueueServiceClient using a shared access key
        # [START async_create_queue_service_client]
        from azure.storage.queue.aio import QueueServiceClient

        queue_service = QueueServiceClient(account_url=self.account_url, credential=self.access_key)
        # [END async_create_queue_service_client]
        # Get information for the Queue Service
        async with queue_service:
            properties = await queue_service.get_service_properties()

    async def authentication_by_oauth_async(self):
        if self.account_url is None:
            print(
                "Missing required environment variable(s). Please see specific test for more details."
                + "\n"
                + "Test: authentication_by_oauth"
            )
            sys.exit(1)

        # [START async_create_queue_service_client_oauth]
        # Get a token credential for authentication
        from azure.identity.aio import DefaultAzureCredential

        token_credential = DefaultAzureCredential()
        # Instantiate a QueueServiceClient using a token credential
        from azure.storage.queue.aio import QueueServiceClient

        queue_service = QueueServiceClient(account_url=self.account_url, credential=token_credential)
        # [END async_create_queue_service_client_oauth]

        # Get information for the Queue Service
        async with queue_service:
            properties = await queue_service.get_service_properties()

    async def authentication_by_shared_access_signature_async(self):
        if (
            self.connection_string is None
            or self.account_name is None
            or self.access_key is None
            or self.account_url is None
        ):
            print(
                "Missing required environment variable(s). Please see specific test for more details."
                + "\n"
                + "Test: authentication_by_shared_access_signature_async"
            )
            sys.exit(1)

        # Instantiate a QueueServiceClient using a connection string
        from azure.storage.queue.aio import QueueServiceClient

        queue_service = QueueServiceClient.from_connection_string(conn_str=self.connection_string)

        # Create a SAS token to use for authentication of a client
        from azure.storage.queue import generate_account_sas, ResourceTypes, AccountSasPermissions

        sas_token = generate_account_sas(
            self.account_name,
            self.access_key,
            resource_types=ResourceTypes(service=True),
            permission=AccountSasPermissions(read=True),
            expiry=datetime.utcnow() + timedelta(hours=1),
        )
        token_auth_queue_service = QueueServiceClient(account_url=self.account_url, credential=sas_token)

        # Get information for the Queue Service
        async with token_auth_queue_service:
            properties = await token_auth_queue_service.get_service_properties()


async def main():
    sample = QueueAuthSamplesAsync()
    await sample.authentication_by_connection_string_async()
    await sample.authentication_by_shared_key_async()
    await sample.authentication_by_oauth_async()
    await sample.authentication_by_shared_access_signature_async()


if __name__ == "__main__":
    asyncio.run(main())