File: test_pool.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (144 lines) | stat: -rw-r--r-- 5,838 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import time
from azure.mgmt.resource import ResourceManagementClient
from devtools_testutils import AzureMgmtRecordedTestCase, recorded_by_proxy
from azure.mgmt.netapp.models import CapacityPool, CapacityPoolPatch
from test_account import create_account, delete_account
from setup import *
import azure.mgmt.netapp.models

DEFAULT_SIZE = 4398046511104


def create_pool(client, rg=TEST_RG, acc_name=TEST_ACC_1, pool_name=TEST_POOL_1, location=LOCATION, pool_only=False):    
    if not pool_only:
        create_account(client, rg, acc_name, location)
    print("Creating pool {0} in NetApp Account '{1} '...".format(pool_name,acc_name))
    pool_body = CapacityPool(service_level=SERVICE_LEVEL, size=DEFAULT_SIZE, location=location)
    pool = client.pools.begin_create_or_update(
        rg,
        acc_name,
        pool_name,
        pool_body
    ).result()
    print("\tdone")
    return pool


def wait_for_no_pool(client, rg, acc_name, pool_name, live=False):
    # a workaround for the async nature of certain ARM processes
    co = 0
    while co < 5:
        co += 1
        if live:
            time.sleep(10)
        try:
            client.pools.get(rg, acc_name, pool_name)
        except:
            # not found is an exception case (status code 200 expected)
            # and is actually what we are waiting for
            break


def delete_pool(client, rg, acc_name, pool_name, live=False):
    # nest resources seem to hang around for a little while even
    # when apparently deleted, therefore give it a chance
    print("Deleting pool {0} in NetApp Account '{1} '...".format(pool_name,acc_name))    
    co = 0
    while co < 5:
        co += 1
        if live:
            time.sleep(10)
        try:
            client.pools.begin_delete(rg, acc_name, pool_name).wait()
        except:
            # Want to catch specifically "Can not delete resource before nested resources are deleted."
            # but should be safe to generalise
            break
    wait_for_no_pool(client, rg, acc_name, pool_name, live)
    print("\tdone")


class TestNetAppCapacityPool(AzureMgmtRecordedTestCase):

    def setup_method(self, method):
        self.client = self.create_mgmt_client(azure.mgmt.netapp.NetAppManagementClient)

    # Before tests are run live a resource group needs to be created along with vnet and subnet
    # Note that when tests are run in live mode it is best to run one test at a time.
    @recorded_by_proxy
    def test_create_delete_pool(self):
        account_name1 = self.get_resource_name(TEST_ACC_1+"-")
        pool = create_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, LOCATION)
        assert pool.size == DEFAULT_SIZE
        assert pool.name == account_name1 + '/' + TEST_POOL_1

        pool_list = self.client.pools.list(TEST_RG, account_name1)
        assert len(list(pool_list)) == 1

        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, live=self.is_live)
        pool_list = self.client.pools.list(TEST_RG, account_name1)
        assert len(list(pool_list)) == 0

        delete_account(self.client, TEST_RG, account_name1, live=self.is_live)

    @recorded_by_proxy
    def test_list_pools(self):
        account_name1 = self.get_resource_name(TEST_ACC_1+"-")
        create_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, LOCATION)
        create_pool(self.client, TEST_RG, account_name1, TEST_POOL_2, LOCATION, pool_only=True)
        pools = [TEST_POOL_1, TEST_POOL_2]

        pool_list = self.client.pools.list(TEST_RG, account_name1)
        assert len(list(pool_list)) == 2
        idx = 0
        for pool in pool_list:
            assert pool.name == pools[idx]
            idx += 1

        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, live=self.is_live)
        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_2, live=self.is_live)
        delete_account(self.client, TEST_RG, account_name1, live=self.is_live)

    @recorded_by_proxy
    def test_get_pool_by_name(self):
        account_name1 = self.get_resource_name(TEST_ACC_1+"-")
        create_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, LOCATION)

        pool = self.client.pools.get(TEST_RG, account_name1, TEST_POOL_1)
        assert pool.name == account_name1 + '/' + TEST_POOL_1

        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, account_name1, live=self.is_live)

    @recorded_by_proxy
    def test_update_pool(self):
        account_name1 = self.get_resource_name(TEST_ACC_1+"-")
        pool = create_pool(self.client, TEST_RG, account_name1, TEST_POOL_1)
        assert pool.qos_type == "Auto"

        pool_body = CapacityPoolPatch(qos_type="Manual", size=DEFAULT_SIZE, location=LOCATION)
        pool = self.client.pools.begin_create_or_update(
            TEST_RG,
            account_name1,
            TEST_POOL_1,
            pool_body
        ).result()
        assert pool.qos_type == "Manual"

        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, account_name1, live=self.is_live)

    @recorded_by_proxy
    def test_patch_pool(self):
        account_name1 = self.get_resource_name(TEST_ACC_1+"-")
        create_pool(self.client, TEST_RG, account_name1, TEST_POOL_1)

        tag = {'Tag2': 'Value1'}
        capacity_pool_patch = CapacityPoolPatch(qos_type="Manual", tags=tag)

        pool = self.client.pools.begin_update(TEST_RG, account_name1, TEST_POOL_1, capacity_pool_patch).result()
        assert pool.qos_type == "Manual"
        assert pool.tags['Tag2'] == 'Value1'

        delete_pool(self.client, TEST_RG, account_name1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, account_name1, live=self.is_live)