File: test_excluded_locations_emulator.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (191 lines) | stat: -rw-r--r-- 8,385 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import unittest
import uuid
import test_config
import pytest
from typing import Callable, List, Mapping, Any

from azure.core.rest import HttpRequest
from _fault_injection_transport import FaultInjectionTransport, ERROR_WITH_COUNTER
from azure.cosmos.container import ContainerProxy
from test_excluded_locations import (L1, L2, read_item_test_data,
                                     TestDataType, set_test_data_type,
                                     get_test_data_with_expected_output)
from test_fault_injection_transport import TestFaultInjectionTransport
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.cosmos.http_constants import ResourceType

CONFIG = test_config.TestConfig()
HOST = CONFIG.host
KEY = CONFIG.masterKey
DATABASE_ID = CONFIG.TEST_DATABASE_ID
SINGLE_PARTITION_CONTAINER_ID = CONFIG.TEST_SINGLE_PARTITION_CONTAINER_ID
SINGLE_PARTITION_PREFIX_PK_CONTAINER_ID = CONFIG.TEST_SINGLE_PARTITION_PREFIX_PK_CONTAINER_ID
L1_URL = CONFIG.local_host
L2_URL = L1_URL.replace("localhost", "127.0.0.1")
URL_TO_LOCATIONS = {
    L1_URL: L1,
    L2_URL: L2
}

set_test_data_type(TestDataType.ALL_TESTS)

# The output data is the number of retries that happened with different timeout region cases
# [timeout L1, timeout L2]
def metadata_read_with_excluded_locations_test_data() -> List[str]:
    client_only_output_data = [
        [1, 0],   #0
        [0, 1],   #1
        [1, 0],   #3
        [1, 0]    #4
    ]
    client_and_request_output_data = [
        [0, 1],   #0
        [0, 1],   #1
        [0, 1],   #2
        [1, 0],   #3
        [1, 0],   #4
        [1, 0],   #5
        [1, 0],   #6
        [1, 0],   #7
    ]
    return get_test_data_with_expected_output(client_only_output_data, client_and_request_output_data)

def get_location(
        initialized_objects: Mapping[str, Any],
        url_to_locations: Mapping[str, str] = URL_TO_LOCATIONS) -> str:
    # get Request URL
    header = initialized_objects['client'].client_connection.last_response_headers
    request_url = header["_request"].url

    # verify
    location = ""
    for url in url_to_locations:
        if request_url.startswith(url):
            location = url_to_locations[url]
            break
    return location

@pytest.mark.unittest
@pytest.mark.cosmosEmulator
class TestExcludedLocationsEmulator:
    @pytest.mark.parametrize('test_data', read_item_test_data())
    def test_delete_all_items_by_partition_key(self: "TestExcludedLocationsEmulator", test_data: List[List[str]]):
        # Init test variables
        preferred_locations, client_excluded_locations, request_excluded_locations, expected_location = test_data

        # Inject topology transformation that would make Emulator look like a multiple write region account
        # with two read regions
        custom_transport = FaultInjectionTransport()
        is_get_account_predicate: Callable[[HttpRequest], bool] = lambda \
            r: FaultInjectionTransport.predicate_is_database_account_call(r)
        emulator_as_multi_write_region_account_transformation = \
            lambda r, inner: FaultInjectionTransport.transform_topology_mwr(
                first_region_name=L1,
                second_region_name=L2,
                inner=inner,
                first_region_url=L1_URL,
                second_region_url=L2_URL,
            )
        custom_transport.add_response_transformation(
            is_get_account_predicate,
            emulator_as_multi_write_region_account_transformation)

        for multiple_write_locations in [True, False]:
            # Create client
            initialized_objects = TestFaultInjectionTransport.setup_method_with_custom_transport(
                custom_transport,
                default_endpoint=HOST,
                key=KEY,
                database_id=DATABASE_ID,
                container_id=SINGLE_PARTITION_CONTAINER_ID,
                preferred_locations=preferred_locations,
                excluded_locations=client_excluded_locations,
                multiple_write_locations=multiple_write_locations,
            )
            container: ContainerProxy = initialized_objects["col"]

            # create an item
            id_value: str = str(uuid.uuid4())
            document_definition = {'id': id_value, 'pk': id_value}
            container.create_item(body=document_definition)

            # API call: delete_all_items_by_partition_key
            if request_excluded_locations is None:
                container.delete_all_items_by_partition_key(id_value)
            else:
                container.delete_all_items_by_partition_key(id_value, excluded_locations=request_excluded_locations)

            # Verify endpoint locations
            actual_location = get_location(initialized_objects)
            if multiple_write_locations:
                assert actual_location == expected_location[0]
            else:
                assert actual_location == L1

    @pytest.mark.parametrize('test_data', metadata_read_with_excluded_locations_test_data())
    def test_metadata_read_with_excluded_locations(self: "TestExcludedLocationsEmulator", test_data: List[List[str]]):
        # Init test variables
        preferred_locations, client_excluded_locations, request_excluded_locations, expected_counts = test_data
        target_urls = [L1_URL, L2_URL]
        expected_locations = [L2, L1]
        for target_url, expected_location, expected_count in zip(target_urls, expected_locations, expected_counts):
            # Inject topology transformation that would make Emulator look like a multiple write region account
            # with two read regions
            custom_transport = FaultInjectionTransport()
            is_get_account_predicate: Callable[[HttpRequest], bool] = lambda \
                    r: FaultInjectionTransport.predicate_is_database_account_call(r)
            emulator_as_multi_write_region_account_transformation = \
                lambda r, inner: FaultInjectionTransport.transform_topology_mwr(
                    first_region_name=L1,
                    second_region_name=L2,
                    inner=inner,
                    first_region_url=L1_URL,
                    second_region_url=L2_URL,
                )
            custom_transport.add_response_transformation(
                is_get_account_predicate,
                emulator_as_multi_write_region_account_transformation)

            # Inject rule to simulate request timeout in target region
            is_request_to_target_region: Callable[[HttpRequest], bool] = lambda \
                    r: (FaultInjectionTransport.predicate_targets_region(r, target_url) and
                        FaultInjectionTransport.predicate_is_resource_type(r, ResourceType.Collection) and
                        not FaultInjectionTransport.predicate_is_write_operation(r, target_url))
            fault_factory = lambda r: custom_transport.error_with_counter(
                CosmosHttpResponseError(
                    status_code=408,
                    message="Request Time Out Error.")
            )
            custom_transport.add_fault(is_request_to_target_region, fault_factory)

            # Create client
            initialized_objects = TestFaultInjectionTransport.setup_method_with_custom_transport(
                custom_transport,
                default_endpoint=HOST,
                key=KEY,
                database_id=DATABASE_ID,
                container_id=SINGLE_PARTITION_PREFIX_PK_CONTAINER_ID,
                preferred_locations=preferred_locations,
                excluded_locations=client_excluded_locations,
                multiple_write_locations=True,
            )
            container: ContainerProxy = initialized_objects["col"]

            custom_transport.reset_counters()
            if request_excluded_locations is None:
                container.read()
            else:
                container.read(excluded_locations=request_excluded_locations)

            # Verify endpoint locations
            actual_location = get_location(initialized_objects)
            assert actual_location == expected_location
            actual_count = custom_transport.counters[ERROR_WITH_COUNTER]
            assert actual_count == expected_count


if __name__ == "__main__":
    unittest.main()