File: resource_testcase.py

package info (click to toggle)
python-azure 20181112%2Bgit-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 407,300 kB
  • sloc: python: 717,190; makefile: 201; sh: 76
file content (63 lines) | stat: -rw-r--r-- 2,442 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from collections import namedtuple

from azure_devtools.scenario_tests import AzureTestError

from azure.common.exceptions import CloudError
from azure.mgmt.resource import ResourceManagementClient

from . import AzureMgmtPreparer


RESOURCE_GROUP_PARAM = 'resource_group'


FakeResource = namedtuple(
    'FakeResource',
    ['name', 'id']
)


class ResourceGroupPreparer(AzureMgmtPreparer):
    def __init__(self, name_prefix='',
                 random_name_length=75,
                 parameter_name=RESOURCE_GROUP_PARAM,
                 parameter_name_for_location='location', location='westus',
                 disable_recording=True, playback_fake_resource=None,
                 client_kwargs=None):
        super(ResourceGroupPreparer, self).__init__(name_prefix, random_name_length,
                                                    disable_recording=disable_recording,
                                                    playback_fake_resource=playback_fake_resource,
                                                    client_kwargs=client_kwargs)
        self.location = location
        self.parameter_name = parameter_name
        self.parameter_name_for_location = parameter_name_for_location

    def create_resource(self, name, **kwargs):
        if self.is_live:
            self.client = self.create_mgmt_client(ResourceManagementClient)
            self.resource = self.client.resource_groups.create_or_update(
                name, {'location': self.location}
            )
        else:
            self.resource = self.resource or FakeResource(
                name=name,
                id="/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/"+name
            )
        return {
            self.parameter_name: self.resource,
            self.parameter_name_for_location: self.location,
        }

    def remove_resource(self, name, **kwargs):
        if self.is_live:
            try:
                if 'wait_timeout' in kwargs:
                    azure_poller = self.client.resource_groups.delete(name)
                    azure_poller.wait(kwargs.get('wait_timeout'))
                    if azure_poller.done():
                        return
                    raise AzureTestError('Timed out waiting for resource group to be deleted.')
                else:
                    self.client.resource_groups.delete(name, polling=False)
            except CloudError:
                pass