1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
# coding: utf-8
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""
FILE: file_samples_share.py
DESCRIPTION:
These samples demonstrate share operations like creating a share snapshot,
setting share quota and metadata, listing directories and files in the
file share, and getting directory and file clients from a share client.
USAGE:
python file_samples_share.py
Set the environment variables with your own values before running the sample:
1) STORAGE_CONNECTION_STRING - the connection string to your storage account
"""
import os
import sys
from azure.storage.fileshare import ShareAccessTier
current_dir = os.path.dirname(os.path.abspath(__file__))
DEST_FILE = os.path.join(current_dir, "SampleDestination.txt")
SOURCE_FILE = os.path.join(current_dir, "SampleSource.txt")
class ShareSamples(object):
connection_string = os.getenv('STORAGE_CONNECTION_STRING')
def create_share_snapshot(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: create_share_snapshot")
sys.exit(1)
# Instantiate the ShareClient from a connection string
from azure.storage.fileshare import ShareClient
share = ShareClient.from_connection_string(self.connection_string, "sharesamples1")
# [START create_share]
# Create share with Access Tier set to Hot
share.create_share(access_tier=ShareAccessTier("Hot"))
# [END create_share]
try:
# [START create_share_snapshot]
share.create_snapshot()
# [END create_share_snapshot]
finally:
# [START delete_share]
share.delete_share(delete_snapshots=True)
# [END delete_share]
def set_share_quota_and_metadata(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: set_share_quota_and_metadata")
sys.exit(1)
# [START create_share_client_from_conn_string]
from azure.storage.fileshare import ShareClient
share = ShareClient.from_connection_string(self.connection_string, "sharesamples2")
# [END create_share_client_from_conn_string]
# Create the share
share.create_share()
try:
# [START set_share_quota]
# Set the quota for the share to 1GB
share.set_share_quota(quota=1)
# [END set_share_quota]
# [START set_share_metadata]
data = {'category': 'test'}
share.set_share_metadata(metadata=data)
# [END set_share_metadata]
# Get the metadata for the share
props = share.get_share_properties().metadata
finally:
# Delete the share
share.delete_share()
def set_share_properties(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: set_share_properties")
sys.exit(1)
from azure.storage.fileshare import ShareClient
share1 = ShareClient.from_connection_string(self.connection_string, "sharesamples3a")
share2 = ShareClient.from_connection_string(self.connection_string, "sharesamples3b")
# Create the share
share1.create_share()
share2.create_share()
try:
# [START set_share_properties]
# Set the tier for the first share to Hot
share1.set_share_properties(access_tier="Hot")
# Set the quota for the first share to 3
share1.set_share_properties(quota=3)
# Set the tier for the second share to Cool and quota to 2
share2.set_share_properties(access_tier=ShareAccessTier("Cool"), quota=2)
# Get the shares' properties
print(share1.get_share_properties().access_tier)
print(share1.get_share_properties().quota)
print(share2.get_share_properties().access_tier)
print(share2.get_share_properties().quota)
# [END set_share_properties]
finally:
# Delete the shares
share1.delete_share()
share2.delete_share()
def list_directories_and_files(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: list_directories_and_files")
sys.exit(1)
# Instantiate the ShareClient from a connection string
from azure.storage.fileshare import ShareClient
share = ShareClient.from_connection_string(self.connection_string, "sharesamples4")
# Create the share
share.create_share()
try:
# [START share_list_files_in_dir]
# Create a directory in the share
dir_client = share.create_directory("mydir")
# Upload a file to the directory
with open(SOURCE_FILE, "rb") as source_file:
dir_client.upload_file(file_name="sample", data=source_file)
# List files in the directory
my_files = list(share.list_directories_and_files(directory_name="mydir"))
print(my_files)
# [END share_list_files_in_dir]
finally:
# Delete the share
share.delete_share()
def get_directory_or_file_client(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: get_directory_or_file_client")
sys.exit(1)
# Instantiate the ShareClient from a connection string
from azure.storage.fileshare import ShareClient
share = ShareClient.from_connection_string(self.connection_string, "sharesamples5")
# Get the directory client to interact with a specific directory
my_dir = share.get_directory_client("dir1")
# Get the file client to interact with a specific file
my_file = share.get_file_client("dir1/myfile")
def acquire_share_lease(self):
if self.connection_string is None:
print("Missing required environment variable: STORAGE_CONNECTION_STRING." + '\n' +
"Test: acquire_share_lease")
sys.exit(1)
# Instantiate the ShareClient from a connection string
from azure.storage.fileshare import ShareClient
share = ShareClient.from_connection_string(self.connection_string, "sharesamples")
# Create the share
share.create_share()
# [START acquire_and_release_lease_on_share]
share.create_directory("mydir")
lease = share.acquire_lease()
share.get_share_properties(lease=lease)
share.delete_share(lease=lease)
# [END acquire_and_release_lease_on_share]
if __name__ == '__main__':
sample = ShareSamples()
sample.create_share_snapshot()
sample.set_share_quota_and_metadata()
sample.set_share_properties()
sample.list_directories_and_files()
sample.get_directory_or_file_client()
sample.acquire_share_lease()
|