File: directory_usage.py

package info (click to toggle)
python-azure-storage 20181109%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 76,472 kB
  • sloc: python: 28,724; makefile: 204; sh: 1
file content (162 lines) | stat: -rw-r--r-- 6,162 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import uuid

from azure.common import (
    AzureConflictHttpError,
    AzureMissingResourceHttpError,
)


class DirectorySamples():
    def __init__(self, account):
        self.account = account

    def run_all_samples(self):
        self.service = self.account.create_file_service()

        self.list_directories_and_files()
        self.create_directory()
        self.delete_directory()
        self.directory_metadata()
        self.directory_properties()
        self.directory_exists()

    def _get_resource_reference(self, prefix):
        return '{}{}'.format(prefix, str(uuid.uuid4()).replace('-', ''))

    def _get_directory_reference(self, prefix='dir'):
        return self._get_resource_reference(prefix)

    def _create_share(self, prefix='share'):
        share_name = self._get_resource_reference(prefix)
        self.service.create_share(share_name)
        return share_name

    def list_directories_and_files(self):
        share_name = self._create_share()

        self.service.create_directory(share_name, 'dir1')
        self.service.create_directory(share_name, 'dir2')

        self.service.create_file(share_name, 'dir1', 'file1', 512)
        self.service.create_file(share_name, 'dir1', 'file2', 512)

        self.service.create_file(share_name, None, 'rootfile', 512)

        # Basic
        # List from root
        root_file_dir = list(self.service.list_directories_and_files(share_name))
        for res in root_file_dir:
            print(res.name)  # dir1, dir2, rootfile

        # List from directory
        dir1 = list(self.service.list_directories_and_files(share_name, 'dir1'))
        for res in dir1:
            print(res.name)  # file1, file2

        # Num results
        root_file_dir = list(self.service.list_directories_and_files(share_name, num_results=2))
        for res in root_file_dir:
            print(res.name)  # dir1, dir2

    def create_directory(self):
        share_name = self._create_share()

        # Basic
        dir_name = self._get_directory_reference()
        created = self.service.create_directory(share_name, dir_name)  # True

        # Metadata
        metadata = {'val1': 'foo', 'val2': 'blah'}
        dir_name = self._get_directory_reference()
        created = self.service.create_directory(share_name, dir_name, metadata=metadata)  # True

        # Fail on exist
        dir_name = self._get_directory_reference()
        created = self.service.create_directory(share_name, dir_name)  # True
        created = self.service.create_directory(share_name, dir_name)  # False
        try:
            self.service.create_directory(share_name, dir_name, fail_on_exist=True)
        except AzureConflictHttpError:
            pass

        self.service.delete_share(share_name)

    def delete_directory(self):
        share_name = self._create_share()

        # Basic
        dir_name = self._get_directory_reference()
        deleted = self.service.delete_directory(share_name, dir_name)  # True

        # Fail not exist
        dir_name = self._get_directory_reference()
        deleted = self.service.delete_directory(share_name, dir_name)  # False
        try:
            self.service.delete_directory(share_name, dir_name, fail_not_exist=True)
        except AzureMissingResourceHttpError:
            pass

        self.service.delete_share(share_name)

    def directory_metadata(self):
        share_name = self._create_share()
        dir_name = self._get_directory_reference()
        self.service.create_directory(share_name, dir_name)
        metadata = {'val1': 'foo', 'val2': 'blah'}

        # Basic
        self.service.set_directory_metadata(share_name, dir_name, metadata=metadata)
        metadata = self.service.get_directory_metadata(share_name,
                                                       dir_name)  # metadata={'val1': 'foo', 'val2': 'blah'}

        # Replaces values, does not merge
        metadata = {'new': 'val'}
        self.service.set_directory_metadata(share_name, dir_name, metadata=metadata)
        metadata = self.service.get_directory_metadata(share_name, dir_name)  # metadata={'new': 'val'}

        # Capital letters
        metadata = {'NEW': 'VAL'}
        self.service.set_directory_metadata(share_name, dir_name, metadata=metadata)
        metadata = self.service.get_directory_metadata(share_name, dir_name)  # metadata={'new': 'VAL'}

        # Clearing
        self.service.set_directory_metadata(share_name, dir_name)
        metadata = self.service.get_directory_metadata(share_name, dir_name)  # metadata={}

        self.service.delete_share(share_name)

    def directory_properties(self):
        share_name = self._create_share()
        dir_name = self._get_directory_reference()
        self.service.create_directory(share_name, dir_name)
        metadata = {'val1': 'foo', 'val2': 'blah'}

        # Note that directory has no settable properties

        # Basic
        directory = self.service.get_directory_properties(share_name, dir_name)
        etag = directory.properties.etag  # etag string
        lmt = directory.properties.last_modified  # datetime object

        # Metadata
        self.service.set_directory_metadata(share_name, dir_name, metadata=metadata)
        directory = self.service.get_directory_properties(share_name, dir_name)
        metadata = directory.metadata  # metadata={'val1': 'foo', 'val2': 'blah'}

        self.service.delete_share(share_name)

    def directory_exists(self):
        share_name = self._create_share()
        directory_name = self._get_directory_reference()

        # Basic
        exists = self.service.exists(share_name, directory_name)  # False
        self.service.create_directory(share_name, directory_name)
        exists = self.service.exists(share_name, directory_name)  # True

        self.service.delete_share(share_name)