File: file_samples_authentication_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (150 lines) | stat: -rw-r--r-- 6,704 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: file_samples_authentication_async.py

DESCRIPTION:
    These samples demonstrate authenticating a client via a connection string,
    shared access key, or by generating a sas token with which the returned signature
    can be used with the credential parameter of any ShareServiceClient,
    ShareClient, ShareDirectoryClient, or ShareFileClient.

USAGE:
    python file_samples_authentication_async.py

    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
    2) STORAGE_ACCOUNT_FILE_SHARE_URL - the queue service account URL
    3) STORAGE_ACCOUNT_NAME - the name of the storage account
    4) STORAGE_ACCOUNT_KEY - the storage account access key
"""

import asyncio
import os
import sys
from datetime import datetime, timedelta

current_dir = os.path.dirname(os.path.abspath(__file__))
DEST_FILE = os.path.join(current_dir, "SampleDestination.txt")

class FileAuthSamplesAsync(object):

    connection_string = os.getenv("STORAGE_CONNECTION_STRING")

    account_url = os.getenv("STORAGE_ACCOUNT_FILE_SHARE_URL")
    account_name = os.getenv("STORAGE_ACCOUNT_NAME")
    access_key = os.getenv("STORAGE_ACCOUNT_KEY")

    async def authentication_connection_string_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: authentication_connection_string_async")
            sys.exit(1)

        # Instantiate the ShareServiceClient from a connection string
        # [START create_share_service_client_from_conn_string]
        from azure.storage.fileshare.aio import ShareServiceClient
        file_service = ShareServiceClient.from_connection_string(self.connection_string)
        # [END create_share_service_client_from_conn_string]

    async def authentication_shared_access_key_async(self):
        if self.account_url is None:
            print("Missing required environment variable: STORAGE_ACCOUNT_FILE_SHARE_URL." + '\n' +
                  "Test: authentication_shared_access_key_async")
            sys.exit(1)

        if self.access_key is None:
            print("Missing required environment variable: STORAGE_ACCOUNT_KEY." + '\n' +
                  "Test: authentication_shared_access_key_async")
            sys.exit(1)

        # Instantiate a ShareServiceClient using a shared access key
        # [START create_share_service_client]
        from azure.storage.fileshare.aio import ShareServiceClient
        share_service_client = ShareServiceClient(
            account_url=self.account_url,
            credential=self.access_key
        )
        # [END create_share_service_client]

    async def authentication_shared_access_signature_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: authentication_shared_access_signature_async")
            sys.exit(1)

        if self.account_name is None:
            print("Missing required environment variable: STORAGE_ACCOUNT_NAME." + '\n' +
                  "Test: authentication_shared_access_signature_async")
            sys.exit(1)

        if self.access_key is None:
            print("Missing required environment variable: STORAGE_ACCOUNT_KEY." + '\n' +
                  "Test: authentication_shared_access_signature_async")
            sys.exit(1)

        # Instantiate a ShareServiceClient using a connection string
        from azure.storage.fileshare.aio import ShareServiceClient
        share_service_client = ShareServiceClient.from_connection_string(self.connection_string)

        # Create a SAS token to use to authenticate a new client
        from azure.storage.fileshare import generate_account_sas, ResourceTypes, AccountSasPermissions

        sas_token = generate_account_sas(
            self.account_name,
            self.access_key,
            resource_types=ResourceTypes(service=True),
            permission=AccountSasPermissions(read=True),
            expiry=datetime.utcnow() + timedelta(hours=1)
        )

    async def authentication_default_azure_credential_async(self):
        if self.account_url is None:
            print("Missing required environment variable: STORAGE_ACCOUNT_FILE_SHARE_URL." + '\n' +
                  "Test: authentication_default_azure_credential_async")
            sys.exit(1)

        # [START file_share_oauth]
        # Get a credential for authentication
        # DefaultAzureCredential attempts a chained set of authentication methods.
        # See documentation here: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity
        from azure.identity.aio import DefaultAzureCredential
        default_credential = DefaultAzureCredential()

        # Instantiate a ShareServiceClient using a token credential and token_intent
        from azure.storage.fileshare.aio import ShareServiceClient
        share_service_client = ShareServiceClient(
            account_url=self.account_url,
            credential=default_credential,
            # When using a token credential, you MUST also specify a token_intent
            token_intent='backup'
        )
        # Only Directory and File operations, and a certain few Share operations, are currently supported for OAuth.
        # Create a ShareFileClient from the ShareServiceClient.
        share_client = share_service_client.get_share_client("myshareasync")
        await share_client.create_share()
        await share_client.create_directory('mydirectory')
        directory_client = share_client.get_directory_client('mydirectory')
        with open(DEST_FILE, "wb") as data:
            await directory_client.upload_file('myfile', data=data)
        share_file_client = directory_client.get_file_client('myfile')

        properties = await share_file_client.get_file_properties()
        # [END file_share_oauth]


async def main():
    sample = FileAuthSamplesAsync()
    await sample.authentication_connection_string_async()
    await sample.authentication_shared_access_key_async()
    await sample.authentication_shared_access_signature_async()
    await sample.authentication_default_azure_credential_async()

if __name__ == '__main__':
    asyncio.run(main())