File: share_usage.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (199 lines) | stat: -rw-r--r-- 7,188 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import time
import uuid

from azure.common import (
    AzureConflictHttpError,
    AzureMissingResourceHttpError,
)

from azure.storage.common import (
    Metrics,
    CorsRule,
)


class ShareSamples():
    def __init__(self, account):
        self.account = account

    def run_all_samples(self):
        self.service = self.account.create_file_service()

        self.create_share()
        self.delete_share()
        self.share_metadata()
        self.share_properties()
        self.share_stats()
        self.share_exists()

        self.list_shares()

        # This method contains sleeps, so don't run by default
        # self.service_properties()

    def _get_share_reference(self, prefix='share'):
        return '{}{}'.format(prefix, str(uuid.uuid4()).replace('-', ''))

    def _create_share(self, prefix='share'):
        share_name = self._get_share_reference(prefix)
        self.service.create_share(share_name)
        return share_name

    def create_share(self):
        # Basic
        share_name1 = self._get_share_reference()
        created = self.service.create_share(share_name1)  # True

        # Quota
        share_name2 = self._get_share_reference()
        created = self.service.create_share(share_name2, quota=1)  # True

        # Metadata
        metadata = {'val1': 'foo', 'val2': 'blah'}
        share_name3 = self._get_share_reference()
        created = self.service.create_share(share_name3, metadata=metadata)  # True

        # Fail on exist
        share_name4 = self._get_share_reference()
        created = self.service.create_share(share_name4)  # True
        created = self.service.create_share(share_name4)  # False
        try:
            self.service.create_share(share_name4, fail_on_exist=True)
        except AzureConflictHttpError:
            pass

        self.service.delete_share(share_name1)
        self.service.delete_share(share_name2)
        self.service.delete_share(share_name3)
        self.service.delete_share(share_name4)

    def delete_share(self):
        # Basic
        share_name = self._create_share()
        deleted = self.service.delete_share(share_name)  # True

        # Fail not exist
        share_name = self._get_share_reference()
        deleted = self.service.delete_share(share_name)  # False
        try:
            self.service.delete_share(share_name, fail_not_exist=True)
        except AzureMissingResourceHttpError:
            pass

    def share_metadata(self):
        share_name = self._create_share()
        metadata = {'val1': 'foo', 'val2': 'blah'}

        # Basic
        self.service.set_share_metadata(share_name, metadata=metadata)
        metadata = self.service.get_share_metadata(share_name)  # metadata={'val1': 'foo', 'val2': 'blah'}

        # Replaces values, does not merge
        metadata = {'new': 'val'}
        self.service.set_share_metadata(share_name, metadata=metadata)
        metadata = self.service.get_share_metadata(share_name)  # metadata={'new': 'val'}

        # Capital letters
        metadata = {'NEW': 'VAL'}
        self.service.set_share_metadata(share_name, metadata=metadata)
        metadata = self.service.get_share_metadata(share_name)  # metadata={'new': 'VAL'}

        # Clearing
        self.service.set_share_metadata(share_name)
        metadata = self.service.get_share_metadata(share_name)  # metadata={}

        self.service.delete_share(share_name)

    def share_properties(self):
        share_name = self._create_share()
        metadata = {'val1': 'foo', 'val2': 'blah'}

        # Basic
        # Sets the share quota to 1 GB
        self.service.set_share_properties(share_name, 1)
        share = self.service.get_share_properties(share_name)
        quota = share.properties.quota  # 1

        # Metadata
        self.service.set_share_metadata(share_name, metadata=metadata)
        share = self.service.get_share_properties(share_name)
        metadata = share.metadata  # metadata={'val1': 'foo', 'val2': 'blah'}

        self.service.delete_share(share_name)

    def share_stats(self):
        share_name = self._create_share()
        self.service.create_file_from_text(share_name, None, 'file1', b'hello world')

        # Basic
        share_usage = self.service.get_share_stats(share_name)  # 1

        self.service.delete_share(share_name)

    def share_exists(self):
        share_name = self._get_share_reference()

        # Basic
        exists = self.service.exists(share_name)  # False
        self.service.create_share(share_name)
        exists = self.service.exists(share_name)  # True

        self.service.delete_share(share_name)

    def list_shares(self):
        share_name1 = self._get_share_reference()
        self.service.create_share('share1', metadata={'val1': 'foo', 'val2': 'blah'})

        share_name2 = self._create_share('share2')
        share_name3 = self._create_share('thirdshare')

        # Basic
        # Commented out as this will list every share in your account
        # shares = list(self.service.list_shares())
        # for share in shares:
        #    print(share.name) # share1, share2, thirdq, all other shares created in the service        

        # Num results
        # Will return in alphabetical order. 
        shares = list(self.service.list_shares(num_results=2))
        for share in shares:
            print(share.name)  # share1, share2

        # Prefix
        shares = list(self.service.list_shares(prefix='share'))
        for share in shares:
            print(share.name)  # share1, share2

        # Metadata
        shares = list(self.service.list_shares(prefix='share', include_metadata=True))
        share = next((q for q in shares if q.name == 'share1'), None)
        metadata = share.metadata  # {'val1': 'foo', 'val2': 'blah'}

        self.service.delete_share(share_name1)
        self.service.delete_share(share_name2)
        self.service.delete_share(share_name3)

    def service_properties(self):
        # Basic
        self.service.set_file_service_properties(hour_metrics=Metrics(enabled=True, include_apis=True),
                                                 minute_metrics=Metrics(enabled=True, include_apis=False),
                                                 cors=[CorsRule(allowed_origins=['*'], allowed_methods=['GET'])])

        # Wait 30 seconds for settings to propagate
        time.sleep(30)

        props = self.service.get_file_service_properties()  # props = ServiceProperties() w/ all properties specified above

        # Omitted properties will not overwrite what's already on the self.service
        # Empty properties will clear
        self.service.set_file_service_properties(cors=[])

        # Wait 30 seconds for settings to propagate
        time.sleep(30)

        props = self.service.get_file_service_properties()  # props = ServiceProperties() w/ CORS rules cleared