File: test_pool.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (139 lines) | stat: -rw-r--r-- 5,583 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import time
import json
import unittest
from azure.mgmt.resource import ResourceManagementClient
from devtools_testutils import AzureMgmtTestCase
from azure.mgmt.netapp.models import CapacityPool, CapacityPoolPatch
from test_account import create_account, delete_account
from setup import *
import azure.mgmt.netapp.models
import unittest

pools = [TEST_POOL_1, TEST_POOL_2]

def create_pool(client, rg=TEST_RG, acc_name=TEST_ACC_1, pool_name=TEST_POOL_1, location=LOCATION, pool_only=False):
    if not pool_only:
        create_account(client, rg, acc_name, location)

    pool_body = CapacityPool(service_level=SERVICE_LEVEL, size=DEFAULT_SIZE, location=location)
    pool = client.pools.create_or_update(
        pool_body,
        rg,
        acc_name,
        pool_name,
        {'location': location}
    ).result()

    return pool

def wait_for_no_pool(client, rg, acc_name, pool_name, live=False):
    # a workaround for the async nature of certain ARM processes
    co=0
    while co<5:
        co += 1
        if live:
            time.sleep(10)
        try:
            pool = client.pools.get(rg, acc_name, pool_name)
        except:
            # not found is an exception case (status code 200 expected)
            # and is actually what we are waiting for
            break

def delete_pool(client, rg, acc_name, pool_name, live=False):
    # nest resources seem to hang around for a little while even
    # when apparently deleted, therefore give it a chance
    co=0
    while co<5:
        co += 1
        if live:
            time.sleep(10)
        try:
            client.pools.delete(rg, acc_name, pool_name).wait()
        except:
            # Want to catch specifically "Can not delete resource before nested resources are deleted."
            # but should be safe to generalise
            break
    wait_for_no_pool(client, rg, acc_name, pool_name, live)


@unittest.skip("skip test")
class NetAppAccountTestCase(AzureMgmtTestCase):
    def setUp(self):
        super(NetAppAccountTestCase, self).setUp()
        self.client = self.create_mgmt_client(azure.mgmt.netapp.AzureNetAppFilesManagementClient)

    def test_create_delete_pool(self):
        pool = create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, LOCATION)
        self.assertEqual(pool.size, DEFAULT_SIZE)
        self.assertEqual(pool.name, TEST_ACC_1 + '/' + TEST_POOL_1)

        pool_list = self.client.pools.list(TEST_RG, TEST_ACC_1)
        self.assertEqual(len(list(pool_list)), 1)

        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_1).wait()
        pool_list = self.client.pools.list(TEST_RG, TEST_ACC_1)
        self.assertEqual(len(list(pool_list)), 0)

        wait_for_no_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, TEST_ACC_1, live=self.is_live)

    def test_list_pools(self):
        pool = create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, LOCATION)
        pool = create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_2, LOCATION, pool_only=True)

        pool_list = self.client.pools.list(TEST_RG, TEST_ACC_1)
        self.assertEqual(len(list(pool_list)), 2)
        idx = 0
        for pool in pool_list:
            self.assertEqual(pool.name, pools[idx])
            idx += 1

        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_1).wait()
        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_2).wait()
        for pool in pools:
            wait_for_no_pool(self.client, TEST_RG, TEST_ACC_1, pools[idx], live=self.is_live)
        delete_account(self.client, TEST_RG, TEST_ACC_1, live=self.is_live)

    def test_get_pool_by_name(self):
        pool = create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, LOCATION)

        pool = self.client.pools.get(TEST_RG, TEST_ACC_1, TEST_POOL_1)
        self.assertEqual(pool.name, TEST_ACC_1 + '/' + TEST_POOL_1)

        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_1).wait()
        wait_for_no_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, TEST_ACC_1, live=self.is_live)

    def test_update_pool(self):
        pool = create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1)
        self.assertEqual(pool.service_level, "Premium")

        pool_body = CapacityPool(service_level="Standard", size=DEFAULT_SIZE, location=LOCATION)
        pool = self.client.pools.create_or_update(
            pool_body,
            TEST_RG,
            TEST_ACC_1,
            TEST_POOL_1,
            {'location': LOCATION}
        ).result()
        self.assertEqual(pool.service_level, "Standard")

        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_1).wait()
        wait_for_no_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, TEST_ACC_1, live=self.is_live)

    def test_patch_pool(self):
        create_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1)

        tag = {'Tag2': 'Value1'}
        capacity_pool_patch = CapacityPoolPatch(service_level="Standard", tags=tag)

        pool = self.client.pools.update(capacity_pool_patch, TEST_RG, TEST_ACC_1, TEST_POOL_1).result()
        self.assertEqual(pool.service_level, "Standard")
        self.assertTrue(pool.tags['Tag2'] == 'Value1')

        self.client.pools.delete(TEST_RG, TEST_ACC_1, TEST_POOL_1).wait()
        wait_for_no_pool(self.client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=self.is_live)
        delete_account(self.client, TEST_RG, TEST_ACC_1, live=self.is_live)