File: file_samples_directory_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (165 lines) | stat: -rw-r--r-- 6,294 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: file_samples_directory_async.py

DESCRIPTION:
    These samples demonstrate async directory operations like creating a directory
    or subdirectory, and working on files within the directories.

USAGE:
    python file_samples_directory_async.py

    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""

import asyncio
import os
import sys

current_dir = os.path.dirname(os.path.abspath(__file__))
DEST_FILE = os.path.join(current_dir, "SampleDestination.txt")
SOURCE_FILE = os.path.join(current_dir, "SampleSource.txt")


class DirectorySamplesAsync(object):

    connection_string = os.getenv('STORAGE_CONNECTION_STRING')

    async def create_directory_and_file_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: create_directory_and_file_async")
            sys.exit(1)

        # Instantiate the ShareClient from a connection string
        from azure.storage.fileshare.aio import ShareClient
        share = ShareClient.from_connection_string(self.connection_string, "directorysamples1async")

        # Create the share
        async with share:
            await share.create_share()

            try:
                # Get the directory client
                directory = share.get_directory_client(directory_path="mydirectory")

                # [START create_directory]
                await directory.create_directory()
                # [END create_directory]

                # [START upload_file_to_directory]
                # Upload a file to the directory
                with open(SOURCE_FILE, "rb") as source:
                    await directory.upload_file(file_name="sample", data=source)
                # [END upload_file_to_directory]

                # [START delete_file_in_directory]
                # Delete the file in the directory
                await directory.delete_file(file_name="sample")
                # [END delete_file_in_directory]

                # [START delete_directory]
                await directory.delete_directory()
                # [END delete_directory]

            finally:
                # Delete the share
                await share.delete_share()

    async def create_subdirectory_and_file_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: create_subdirectory_and_file_async")
            sys.exit(1)

        # Instantiate the ShareClient from a connection string
        from azure.storage.fileshare.aio import ShareClient
        share = ShareClient.from_connection_string(self.connection_string, "directorysamples2async")

        # Create the share
        async with share:
            await share.create_share()

            try:
                # Get the directory client
                parent_dir = share.get_directory_client(directory_path="parentdir")

                # [START create_subdirectory]
                # Create the directory
                await parent_dir.create_directory()

                # Create a subdirectory
                subdir = await parent_dir.create_subdirectory("subdir")
                # [END create_subdirectory]

                # Upload a file to the parent directory
                with open(SOURCE_FILE, "rb") as source:
                    await parent_dir.upload_file(file_name="sample", data=source)

                # Upload a file to the subdirectory
                with open(SOURCE_FILE, "rb") as source:
                    await subdir.upload_file(file_name="sample", data=source)

                # [START lists_directory]
                # List the directories and files under the parent directory
                my_list = []
                async for item in parent_dir.list_directories_and_files():
                    my_list.append(item)
                print(my_list)
                # [END lists_directory]

                # You must delete the file in the subdirectory before deleting the subdirectory
                await subdir.delete_file("sample")
                # [START delete_subdirectory]
                await parent_dir.delete_subdirectory("subdir")
                # [END delete_subdirectory]

            finally:
                # Delete the share
                await share.delete_share()

    async def get_subdirectory_client_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: get_subdirectory_client_async")
            sys.exit(1)

        # Instantiate the ShareClient from a connection string
        from azure.storage.fileshare.aio import ShareClient
        share = ShareClient.from_connection_string(self.connection_string, "directorysamples3async")

        # Create the share
        async with share:
            await share.create_share()

            try:
                # [START get_subdirectory_client]
                # Get a directory client and create the directory
                parent = share.get_directory_client("dir1")
                await parent.create_directory()

                # Get a subdirectory client and create the subdirectory "dir1/dir2"
                subdirectory = parent.get_subdirectory_client("dir2")
                await subdirectory.create_directory()
                # [END get_subdirectory_client]
            finally:
                # Delete the share
                await share.delete_share()


async def main():
    sample = DirectorySamplesAsync()
    await sample.create_directory_and_file_async()
    await sample.create_subdirectory_and_file_async()
    await sample.get_subdirectory_client_async()

if __name__ == '__main__':
    asyncio.run(main())