File: blob_samples_batch_delete_blobs.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (49 lines) | stat: -rw-r--r-- 1,801 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from azure.core.exceptions import ResourceExistsError
from azure.storage.blob import BlobServiceClient
import os
import sys

"""
FILE: blob_samples_batch_delete_blobs.py
DESCRIPTION:
    This sample demonstrates batch deleting blobs from a container.
USAGE:
    python blob_samples_batch_delete_blobs.py
    Set the environment variables with your own values before running the sample:
    1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""

current_dir = os.path.dirname(os.path.abspath(__file__))
SOURCE_FOLDER = os.path.join(current_dir, "./sample-blobs/")


def batch_delete_blobs_sample(local_path):
    # Set the connection string and container name values to initialize the Container Client
    connection_string = os.getenv('STORAGE_CONNECTION_STRING')

    if connection_string is None:
        print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
              "Test: batch_delete_blobs_sample")
        sys.exit(1)

    blob_service_client = BlobServiceClient.from_connection_string(conn_str=connection_string)
    # Create a ContainerClient to use the batch_delete function on a Blob Container
    container_client = blob_service_client.get_container_client("mycontainername")
    try:
        container_client.create_container()
    except ResourceExistsError:
        pass
    # Upload blobs
    for filename in os.listdir(local_path):
        with open(local_path+filename, "rb") as data:
            container_client.upload_blob(name=filename, data=data, blob_type="BlockBlob")

    # List blobs in storage account
    blob_list = [b.name for b in list(container_client.list_blobs())]

    # Delete blobs
    container_client.delete_blobs(*blob_list)

if __name__ == '__main__':
    batch_delete_blobs_sample(SOURCE_FOLDER)