File: test_multimaster.py

package info (click to toggle)
python-azure 20230112%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 749,544 kB
  • sloc: python: 6,815,827; javascript: 287; makefile: 195; xml: 109; sh: 105
file content (132 lines) | stat: -rw-r--r-- 5,014 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import unittest
import uuid
import azure.cosmos.cosmos_client as cosmos_client
import pytest
import azure.cosmos._constants as constants
from azure.cosmos.http_constants import HttpHeaders
from azure.cosmos import _retry_utility
import test_config
from azure.cosmos.partition_key import PartitionKey

pytestmark = pytest.mark.cosmosEmulator

@pytest.mark.usefixtures("teardown")
class MultiMasterTests(unittest.TestCase):

    host = test_config._test_config.host
    masterKey = test_config._test_config.masterKey
    connectionPolicy = test_config._test_config.connectionPolicy
    counter = 0
    last_headers = []

    def test_tentative_writes_header_present(self):
        self.last_headers = []
        self.EnableMultipleWritableLocations = True
        self._validate_tentative_write_headers()
        
    def test_tentative_writes_header_not_present(self):
        self.last_headers = []
        self.EnableMultipleWritableLocations = False
        self._validate_tentative_write_headers()

    
    def _validate_tentative_write_headers(self):
        self.OriginalExecuteFunction = _retry_utility.ExecuteFunction
        _retry_utility.ExecuteFunction = self._MockExecuteFunction

        connectionPolicy = MultiMasterTests.connectionPolicy
        connectionPolicy.UseMultipleWriteLocations = True
        client = cosmos_client.CosmosClient(MultiMasterTests.host, MultiMasterTests.masterKey, consistency_level="Session",
                                            connection_policy=connectionPolicy)

        created_db = client.create_database(id='multi_master_tests ' + str(uuid.uuid4()))

        created_collection = created_db.create_container(id='test_db', partition_key=PartitionKey(path='/pk', kind='Hash'))

        document_definition = { 'id': 'doc' + str(uuid.uuid4()),
                                'pk': 'pk',
                                'name': 'sample document',
                                'operation': 'insertion'}
        created_document = created_collection.create_item(body=document_definition)

        sproc_definition = {
            'id': 'sample sproc' + str(uuid.uuid4()),
            'serverScript': 'function() {var x = 10;}'
        }
        sproc = created_collection.scripts.create_stored_procedure(body=sproc_definition)

        created_collection.scripts.execute_stored_procedure(
            sproc=sproc['id'],
            partition_key='pk'
        )

        created_collection.read_item(
            item=created_document,
            partition_key='pk'
        )

        created_document['operation'] = 'replace'
        replaced_document = created_collection.replace_item(
            item=created_document['id'],
            body=created_document
        )

        replaced_document['operation'] = 'upsert'
        upserted_document = created_collection.upsert_item(body=replaced_document)

        created_collection.delete_item(
            item=upserted_document,
            partition_key='pk'
        )

        client.delete_database(created_db)

        print(len(self.last_headers))
        is_allow_tentative_writes_set = self.EnableMultipleWritableLocations is True

        # Create Database
        self.assertEqual(self.last_headers[0], is_allow_tentative_writes_set)

        # Create Container
        self.assertEqual(self.last_headers[1], is_allow_tentative_writes_set)

        # Create Document - Makes one initial call to fetch collection
        self.assertEqual(self.last_headers[2], is_allow_tentative_writes_set)
        self.assertEqual(self.last_headers[3], is_allow_tentative_writes_set)

        # Create Stored procedure
        self.assertEqual(self.last_headers[4], is_allow_tentative_writes_set)

        # Execute Stored procedure
        self.assertEqual(self.last_headers[5], is_allow_tentative_writes_set)

        # Read Document
        self.assertEqual(self.last_headers[6], is_allow_tentative_writes_set)

        # Replace Document
        self.assertEqual(self.last_headers[7], is_allow_tentative_writes_set)

        # Upsert Document
        self.assertEqual(self.last_headers[8], is_allow_tentative_writes_set)

        # Delete Document
        self.assertEqual(self.last_headers[9], is_allow_tentative_writes_set)

        # Delete Database
        self.assertEqual(self.last_headers[10], is_allow_tentative_writes_set)

        _retry_utility.ExecuteFunction = self.OriginalExecuteFunction

    def _MockExecuteFunction(self, function, *args, **kwargs):
        self.counter += 1
        if self.counter == 1:
            return {constants._Constants.EnableMultipleWritableLocations: self.EnableMultipleWritableLocations}, {}
        else:
            if len(args) > 0:
                self.last_headers.append(HttpHeaders.AllowTentativeWrites in args[4].headers
                                         and args[4].headers[HttpHeaders.AllowTentativeWrites] == 'true')
            return self.OriginalExecuteFunction(function, *args, **kwargs)


if __name__ == '__main__':
    unittest.main()