File: disable_test_mgmt_cosmosdb.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (142 lines) | stat: -rw-r--r-- 4,977 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# coding: utf-8

#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------
import unittest

import azure.mgmt.cosmosdb
from msrestazure.azure_exceptions import CloudError
from devtools_testutils import AzureMgmtTestCase, ResourceGroupPreparer


class MgmtCosmosDBTest(AzureMgmtTestCase):

    def setUp(self):
        super(MgmtCosmosDBTest, self).setUp()
        self.client = self.create_mgmt_client(
            azure.mgmt.cosmosdb.CosmosDBManagementClient
        )

    @unittest.skip('hard to test')
    @ResourceGroupPreparer()
    def test_accounts_create(self, resource_group, location):
        account_name = self.get_resource_name('pycosmosdbx1')

        self.assertFalse(self.client.database_accounts.check_name_exists(account_name))

        async_cosmosdb_create = self.client.database_accounts.create_or_update(
            resource_group.name,
            account_name,
            {
                'location': location,
                'locations': [{
                    'location_name': self.region
                }]
            }
        )
        account = async_cosmosdb_create.result()
        self.assertIsNotNone(account)
        # Rest API issue
        # self.assertEqual(account.name, account_name)

    @unittest.skip('hard to test')
    @ResourceGroupPreparer()
    def test_accounts_features(self, resource_group, location):
        account_name = self.get_resource_name('pycosmosdbx2')

        if not self.is_playback():
            async_cosmosdb_create = self.client.database_accounts.create_or_update(
                resource_group.name,
                account_name,
                {
                    'location': location,
                    'locations': [{
                        'location_name': self.region
                    }]
                }
            )
            async_cosmosdb_create.wait()

        account = self.client.database_accounts.get(
            resource_group.name,
            account_name
        )
        self.assertEqual(account.name, account_name)

        my_accounts = list(self.client.database_accounts.list_by_resource_group(resource_group.name))
        self.assertEqual(len(my_accounts), 1)
        self.assertEqual(my_accounts[0].name, account_name)

        my_accounts = list(self.client.database_accounts.list())
        self.assertTrue(len(my_accounts) >= 1)
        self.assertTrue(any(db.name == account_name for db in my_accounts))

        async_change = self.client.database_accounts.failover_priority_change(
            resource_group.name,
            account_name,
            [{
                'location_name': self.region,
                'failover_priority': 0
            }]
        )
        async_change.wait()

        my_keys = self.client.database_accounts.list_keys(
            resource_group.name,
            account_name
        )
        self.assertIsNotNone(my_keys.primary_master_key)
        self.assertIsNotNone(my_keys.secondary_master_key)
        self.assertIsNotNone(my_keys.primary_readonly_master_key)
        self.assertIsNotNone(my_keys.secondary_readonly_master_key)


        my_keys = self.client.database_accounts.list_read_only_keys(
            resource_group.name,
            account_name
        )
        self.assertIsNotNone(my_keys.primary_readonly_master_key)
        self.assertIsNotNone(my_keys.secondary_readonly_master_key)


        async_regenerate = self.client.database_accounts.regenerate_key(
            resource_group.name,
            account_name,
            "primary"
        )
        async_regenerate.wait()

    @unittest.skip('hard to test')
    @ResourceGroupPreparer()
    def test_accounts_delete(self, resource_group, location):
        account_name = self.get_resource_name('pydocumentdbx3')

        if not self.is_playback():
            async_cosmosdb_create = self.client.database_accounts.create_or_update(
                resource_group.name,
                account_name,
                {
                    'location': location,
                    'locations': [{
                        'location_name': self.region
                    }]
                }
            )
            async_cosmosdb_create.wait()

        # Current implementation of msrestazure does not support 404 as a end of LRO delete
        # https://github.com/Azure/msrestazure-for-python/issues/7
        async_delete = self.client.database_accounts.delete(resource_group.name, account_name)
        try:
            async_delete.wait()
        except CloudError as err:
            if err.response.status_code != 404:
                raise


#------------------------------------------------------------------------------
if __name__ == '__main__':
    unittest.main()