File: test_subvolume.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (164 lines) | stat: -rw-r--r-- 7,797 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from azure.mgmt.netapp.models import SubvolumeInfo, SubvolumePatchRequest
from devtools_testutils import AzureMgmtRecordedTestCase, recorded_by_proxy, set_bodiless_matcher
from test_volume import create_volume, delete_volume, delete_pool, delete_account, create_virtual_network
from setup import *
import azure.mgmt.netapp.models
import time

def wait_for_subvolume_created(client, rg, account_name, pool_name, volume_name, subvolume_name, live=False):
    co = 0
    while co < 40:
        co += 1
        if live:
            time.sleep(5)
        subvolume = client.subvolumes.get(rg, account_name, pool_name, volume_name, subvolume_name)
        if subvolume.provisioning_state == "Succeeded":
            break


class TestNetAppSubvolume(AzureMgmtRecordedTestCase):

    def setup_method(self, method):
        self.client = self.create_mgmt_client(azure.mgmt.netapp.NetAppManagementClient)
        if self.is_live:
            from azure.mgmt.network import NetworkManagementClient
            self.network_client = self.create_mgmt_client(NetworkManagementClient) 

    # Before tests are run live a resource group needs to be created along with vnet and subnet
    # Note that when tests are run in live mode it is best to run one test at a time.
    @recorded_by_proxy
    def test_crud_subvolumes(self):
        set_bodiless_matcher()
        ACCOUNT1 = self.get_resource_name(TEST_ACC_1+"-")
        volumeName1 = self.get_resource_name(TEST_VOL_1+"-")
        VNETNAME = self.get_resource_name(VNET+"-")
        if self.is_live:
            SUBNET = create_virtual_network(self.network_client, TEST_RG, LOCATION, VNETNAME, 'default')
        create_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, LOCATION, vnet=VNETNAME, enable_subvolumes="Enabled")
        path = "/sub_vol_1.txt"
        size = 1000000
        parent_path = "/parent_sub_vol_1.txt"        
        subvolume_body = SubvolumeInfo(
            path=path,
            size=size
        )

        # create
        subvolume_info = self.client.subvolumes.begin_create(
            TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1, subvolume_body).result()
        assert subvolume_info.name == ACCOUNT1 + "/" + TEST_POOL_1 + "/" + volumeName1 + "/" + TEST_SUBVOLUME_1
        assert subvolume_info.path == path

        # update
        path = "/sub_vol_update.txt"
        size = 2000000
        subvolume_patch = SubvolumePatchRequest(
            path=path,
            size=size,
        )
        subvolume_info = self.client.subvolumes.begin_update(
            TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1, subvolume_patch).result()
        assert subvolume_info.name == ACCOUNT1 + "/" + TEST_POOL_1 + "/" + volumeName1 + "/" + TEST_SUBVOLUME_1
        assert subvolume_info.path, path

        # get
        subvolume_info = self.client.subvolumes.get(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1)
        assert subvolume_info.name == ACCOUNT1 + "/" + TEST_POOL_1 + "/" + volumeName1 + "/" + TEST_SUBVOLUME_1
        assert subvolume_info.path == path

        # delete
        self.client.subvolumes.begin_delete(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1).wait()
        subvolume_list = self.client.subvolumes.list_by_volume(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1)
        assert len(list(subvolume_list)) == 0

        # clean up
        delete_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, live=self.is_live)
        delete_pool(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, ACCOUNT1, live=self.is_live)
        if self.is_live:
            self.network_client.virtual_networks.begin_delete(TEST_RG, VNETNAME)


    @recorded_by_proxy
    def test_list_by_volume(self):
        set_bodiless_matcher()
        ACCOUNT1 = self.get_resource_name(TEST_ACC_1+"-")
        volumeName1 = self.get_resource_name(TEST_VOL_1+"-")
        VNETNAME = self.get_resource_name(VNET+"-")
        if self.is_live:
            SUBNET = create_virtual_network(self.network_client, TEST_RG, LOCATION, VNETNAME, 'default')
        create_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, LOCATION, vnet=VNETNAME, enable_subvolumes="Enabled")

        path1 = "/sub_vol_1.txt"
        size1 = 1000000
        subvolume_body1 = SubvolumeInfo(
            path=path1,
            size=size1
        )

        path2 = "/sub_vol_2.txt"
        size2 = 2000000
        subvolume_body2 = SubvolumeInfo(
            path=path2,
            size=size2
        )

        # create
        self.client.subvolumes.begin_create(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1, subvolume_body1)
        wait_for_subvolume_created(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1)
        self.client.subvolumes.begin_create(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_2, subvolume_body2)

        # list_by_volume
        subvolume_list = self.client.subvolumes.list_by_volume(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1)
        assert len(list(subvolume_list)) == 2

        self.client.subvolumes.begin_delete(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1).wait()
        subvolume_list = self.client.subvolumes.list_by_volume(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1)
        assert len(list(subvolume_list)) == 1

        self.client.subvolumes.begin_delete(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_2).wait()
        subvolume_list = self.client.subvolumes.list_by_volume(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1)
        assert len(list(subvolume_list)) == 0

        # clean up
        delete_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, live=self.is_live)
        delete_pool(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, ACCOUNT1, live=self.is_live)
        if self.is_live:
            self.network_client.virtual_networks.begin_delete(TEST_RG, VNETNAME)

    @recorded_by_proxy
    def test_get_metadata(self):
        set_bodiless_matcher()
        ACCOUNT1 = self.get_resource_name(TEST_ACC_1+"-")
        volumeName1 = self.get_resource_name(TEST_VOL_1+"-")
        VNETNAME = self.get_resource_name(VNET+"-")
        if self.is_live:
            SUBNET = create_virtual_network(self.network_client, TEST_RG, LOCATION, VNETNAME, 'default')
        create_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, LOCATION, vnet=VNETNAME, enable_subvolumes="Enabled")

        path = "/sub_vol_1.txt"
        size = 123
        subvolume_body = SubvolumeInfo(
            path=path,
            size=size
        )

        # create
        subvolume_info = self.client.subvolumes.begin_create(
            TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1, subvolume_body).result()
        assert subvolume_info.name == ACCOUNT1 + "/" + TEST_POOL_1 + "/" + volumeName1 + "/" + TEST_SUBVOLUME_1
        assert subvolume_info.path == path

        # get metadata
        metadata = self.client.subvolumes.begin_get_metadata(
            TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1).result()
        assert metadata is not None

        # clean up
        self.client.subvolumes.begin_delete(TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, TEST_SUBVOLUME_1).wait()
        delete_volume(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, volumeName1, live=self.is_live)
        delete_pool(self.client, TEST_RG, ACCOUNT1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, ACCOUNT1, live=self.is_live)
        if self.is_live:
            self.network_client.virtual_networks.begin_delete(TEST_RG, VNETNAME)