File: blob_samples_common_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (253 lines) | stat: -rw-r--r-- 10,933 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: blob_samples_common_async.py
DESCRIPTION:
    This sample demonstrates common blob operations including creating snapshots, soft deleteing, undeleting blobs,
    batch deleting blobs and acquiring lease.
USAGE:
    python blob_samples_common_async.py
    Set the environment variables with your own values before running the sample.
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""

import os
import sys
import asyncio
from azure.core.exceptions import ResourceExistsError

current_dir = os.path.dirname(os.path.abspath(__file__))
SOURCE_FILE = os.path.join(current_dir, 'SampleSource.txt')


class CommonBlobSamplesAsync(object):

    connection_string = os.getenv("STORAGE_CONNECTION_STRING_SOFT")

    #--Begin Blob Samples-----------------------------------------------------------------

    async def blob_snapshots_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: blob_snapshots_async")
            sys.exit(1)
        # Instantiate a BlobServiceClient using a connection string
        from azure.storage.blob.aio import BlobServiceClient
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)

        # Instantiate a ContainerClient
        async with blob_service_client:
            container_client = blob_service_client.get_container_client("containerformyblobsasync")

            # Create new Container
            try:
                await container_client.create_container()
            except ResourceExistsError:
                pass

            # Upload a blob to the container
            with open(SOURCE_FILE, "rb") as data:
                await container_client.upload_blob(name="my_blob", data=data)

            # Get a BlobClient for a specific blob
            blob_client = blob_service_client.get_blob_client(container="containerformyblobsasync", blob="my_blob")

            # [START create_blob_snapshot]
            # Create a read-only snapshot of the blob at this point in time
            snapshot_blob = await blob_client.create_snapshot()

            # Get the snapshot ID
            print(snapshot_blob.get('snapshot'))

            # Delete only the snapshot (blob itself is retained)
            await blob_client.delete_blob(delete_snapshots="only")
            # [END create_blob_snapshot]

            # Delete container
            await blob_service_client.delete_container("containerformyblobsasync")

    async def soft_delete_and_undelete_blob_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: soft_delete_and_undelete_blob_async")
            sys.exit(1)
        # Instantiate a BlobServiceClient using a connection string
        from azure.storage.blob.aio import BlobServiceClient
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)

        async with blob_service_client:
            # Create a retention policy to retain deleted blobs
            from azure.storage.blob import RetentionPolicy
            delete_retention_policy = RetentionPolicy(enabled=True, days=1)

            # Set the retention policy on the service
            await blob_service_client.set_service_properties(delete_retention_policy=delete_retention_policy)

            # Instantiate a ContainerClient
            container_client = blob_service_client.get_container_client("containerfordeletedblobsasync")

            # Create new Container
            try:
                await container_client.create_container()
            except ResourceExistsError:
                # Container already created
                pass

            # Upload a blob to the container
            with open(SOURCE_FILE, "rb") as data:
                blob_client = await container_client.upload_blob(name="my_blob", data=data)

            # Soft delete blob in the container (blob can be recovered with undelete)
            await blob_client.delete_blob()

            # [START undelete_blob]
            # Undelete the blob before the retention policy expires
            await blob_client.undelete_blob()
            # [END undelete_blob]

            # [START get_blob_properties]
            properties = await blob_client.get_blob_properties()
            # [END get_blob_properties]

            # Delete container
            await blob_service_client.delete_container("containerfordeletedblobsasync")

    async def delete_multiple_blobs_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: delete_multiple_blobs_async")
            sys.exit(1)
        # Instantiate a BlobServiceClient using a connection string
        from azure.storage.blob.aio import BlobServiceClient
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)

        async with blob_service_client:
            # Instantiate a ContainerClient
            container_client = blob_service_client.get_container_client("containerforbatchblobdeletesasync")

            # Create new Container
            try:
                await container_client.create_container()
            except ResourceExistsError:
                # Container already created
                pass

            # Upload a blob to the container
            upload_data = b"Hello World"
            await container_client.upload_blob(name="my_blob1", data=upload_data)
            await container_client.upload_blob(name="my_blob2", data=upload_data)
            await container_client.upload_blob(name="my_blob3", data=upload_data)

            # [START delete_multiple_blobs]
            # Delete multiple blobs in the container by name
            await container_client.delete_blobs("my_blob1", "my_blob2")

            # Delete multiple blobs by properties iterator
            my_blobs = container_client.list_blobs(name_starts_with="my_blob")
            await container_client.delete_blobs(*[b async for b in my_blobs])  # async for in list comprehension after 3.6 only
            # [END delete_multiple_blobs]

            # Delete container
            await blob_service_client.delete_container("containerforbatchblobdeletesasync")

    async def acquire_lease_on_blob_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: acquire_lease_on_blob_async")
            sys.exit(1)
        # Instantiate a BlobServiceClient using a connection string
        from azure.storage.blob.aio import BlobServiceClient
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)

        async with blob_service_client:
            # Instantiate a ContainerClient
            container_client = blob_service_client.get_container_client("leasemyblobscontainerasync")

            # Create new Container
            try:
                await container_client.create_container()
            except ResourceExistsError:
                pass

            # Upload a blob to the container
            with open(SOURCE_FILE, "rb") as data:
                await container_client.upload_blob(name="my_blob", data=data)

            # [START acquire_lease_on_blob]
            # Get the blob client
            blob_client = blob_service_client.get_blob_client("leasemyblobscontainerasync", "my_blob")

            # Acquire a lease on the blob
            lease = await blob_client.acquire_lease()

            # Delete blob by passing in the lease
            await blob_client.delete_blob(lease=lease)
            # [END acquire_lease_on_blob]

            # Delete container
            await blob_service_client.delete_container("leasemyblobscontainerasync")

    async def start_copy_blob_from_url_and_abort_copy_async(self):
        if self.connection_string is None:
            print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
                  "Test: start_copy_blob_from_url_and_abort_copy_async")
            sys.exit(1)
        # Instantiate a BlobServiceClient using a connection string
        from azure.storage.blob.aio import BlobServiceClient
        blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)

        async with blob_service_client:
            # Instantiate a ContainerClient
            container_client = blob_service_client.get_container_client("copyblobcontainerasync")

            # Create new Container
            try:
                await container_client.create_container()
            except ResourceExistsError:
                pass

            try:
                # [START copy_blob_from_url]
                # Get the blob client with the source blob
                source_blob = "https://www.gutenberg.org/files/59466/59466-0.txt"
                copied_blob = blob_service_client.get_blob_client("copyblobcontainerasync", '59466-0.txt')

                # start copy and check copy status
                copy = await copied_blob.start_copy_from_url(source_blob)
                props = await copied_blob.get_blob_properties()
                print(props.copy.status)
                # [END copy_blob_from_url]

                copy_id = props.copy.id
                # [START abort_copy_blob_from_url]
                # Passing in copy id to abort copy operation
                if props.copy.status != "success":
                    if copy_id is not None:
                        await copied_blob.abort_copy(copy_id)
                    else:
                        print("copy_id was unexpectedly None, check if the operation completed successfully.")

                # check copy status
                props = await copied_blob.get_blob_properties()
                print(props.copy.status)
                # [END abort_copy_blob_from_url]

            finally:
                await blob_service_client.delete_container("copyblobcontainerasync")

async def main():
    sample = CommonBlobSamplesAsync()
    await sample.blob_snapshots_async()
    await sample.soft_delete_and_undelete_blob_async()
    await sample.delete_multiple_blobs_async()
    await sample.acquire_lease_on_blob_async()
    await sample.start_copy_blob_from_url_and_abort_copy_async()

if __name__ == '__main__':
    asyncio.run(main())