File: test_session_token_helpers.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (174 lines) | stat: -rw-r--r-- 9,859 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import random
import unittest

import pytest

import azure.cosmos.cosmos_client as cosmos_client
import test_config
from azure.cosmos import DatabaseProxy
from azure.cosmos._change_feed.feed_range_internal import FeedRangeInternalEpk
from azure.cosmos._routing.routing_range import Range

COLLECTION = "created_collection"
DATABASE = "created_db"
@pytest.fixture(scope="class")
def setup():
    if (TestSessionTokenHelpers.masterKey == '[YOUR_KEY_HERE]' or
            TestSessionTokenHelpers.host == '[YOUR_ENDPOINT_HERE]'):
        raise Exception(
            "You must specify your Azure Cosmos account values for "
            "'masterKey' and 'host' at the top of this class to run the "
            "tests.")
    test_client = cosmos_client.CosmosClient(TestSessionTokenHelpers.host, test_config.TestConfig.masterKey),
    created_db = test_client[0].get_database_client(TestSessionTokenHelpers.TEST_DATABASE_ID)
    return {
        DATABASE: created_db,
        COLLECTION: created_db.get_container_client(TestSessionTokenHelpers.TEST_COLLECTION_ID)
    }

def create_split_ranges():
    return [        # split with two children
                   ([(("AA", "DD"), "0:1#51#3=52"), (("AA", "BB"),"1:1#55#3=52"), (("BB", "DD"),"2:1#54#3=52")],
                    ("AA", "DD"), "1:1#55#3=52,2:1#54#3=52"),
                    # same range different partition key range ids
                   ([(("AA", "DD"), "1:1#51#3=52"), (("AA", "DD"),"0:1#55#3=52")],
                    ("AA", "DD"), "0:1#55#3=52"),
                    # split with one child
                   ([(("AA", "DD"), "0:1#51#3=52"), (("AA", "BB"),"1:1#55#3=52")],
                    ("AA", "DD"), "0:1#51#3=52,1:1#55#3=52"),
                    # Highest GLSN, which is 55 in this         # cspell:disable-line
                    # ex. "1:1#55#3=52", is the one that will be returned because
                    # it is higher than all of the other feed range contained in the same
                    # range
                   ([(("AA", "DD"), "0:1#42#3=52"), (("AA", "BB"), "1:1#51#3=52"),
                    (("BB", "CC"),"1:1#53#3=52"), (("CC", "DD"),"1:1#55#3=52")],
                    ("AA", "DD"), "1:1#55#3=52"),
                    # Highest GLSN, which is 60 in this            # cspell:disable-line
                    # ex. "1:1#60#3=52", is the one that will be returned because
                    # it is higher than all of the other feed range contained in the same
                    # range with some of them overlapping with each other
                   ([(("AA", "DD"), "0:1#60#3=52"), (("AA", "BB"), "1:1#51#3=52"),
                    (("BB", "CC"),"1:1#53#3=52"), (("CC", "DD"),"1:1#55#3=52")],
                    ("AA", "DD"), "0:1#60#3=52"),
                    # AA-DD can be created from the other ranges
                    # but the GLSN's are not all larger than the one  # cspell:disable-line
                    # in the AA-DD range so we just compound as cannot make
                    # conclusions in this case
                   ([(("AA", "DD"), "0:1#60#3=52"), (("AA", "BB"), "1:1#51#3=52"),
                    (("BB", "CC"),"1:1#66#3=52"), (("CC", "DD"),"1:1#55#3=52")],
                    ("AA", "DD"), "0:1#60#3=52,1:1#66#3=52"),
                    # merge with one child
                   ([(("AA", "DD"), "3:1#55#3=52"), (("AA", "BB"),"1:1#51#3=52")],
                    ("AA", "DD"), "3:1#55#3=52"),
                    # merge with two children
                   ([(("AA", "DD"), "3:1#55#3=52"), (("AA", "BB"),"1:1#51#3=52"), (("BB", "DD"),"2:1#54#3=52")],
                    ("AA", "DD"), "3:1#55#3=52"),
                    # compound session token
                   ([(("AA", "DD"), "2:1#54#3=52,1:1#55#3=52"), (("AA", "BB"),"0:1#51#3=52")],
                    ("AA", "BB"), "2:1#54#3=52,1:1#55#3=52,0:1#51#3=52"),
                    # several compound session token with one range
                   ([(("AA", "DD"), "2:1#57#3=52,1:1#57#3=52"), (("AA", "DD"),"2:1#56#3=52,1:1#58#3=52")],
                    ("AA", "DD"), "2:1#57#3=52,1:1#58#3=52"),
                    # overlapping ranges
                   ([(("AA", "CC"), "0:1#54#3=52"), (("BB", "FF"),"2:1#51#3=52")],
                    ("AA", "EE"), "0:1#54#3=52,2:1#51#3=52"),
                    # different version numbers
                   ([(("AA", "BB"), "0:1#54#3=52"), (("AA", "BB"),"0:2#57#3=53")],
                    ("AA", "BB"), "0:2#57#3=53"),
                    # mixed scenarios
                   ([(("AA", "DD"), "3:1#60#3=53"), (("AA", "BB"), "1:1#54#3=52"), (("AA", "BB"), "1:1#52#3=53"),
                     (("BB", "CC"),"1:1#53#3=52"), (("BB", "CC"),"6:1#70#3=55,4:1#90#3=52"),
                     (("CC", "DD"),"1:1#55#3=52")], ("AA", "DD"), "3:1#60#3=53,6:1#70#3=55,4:1#90#3=52")
                  ]

@pytest.mark.cosmosEmulator
@pytest.mark.unittest
@pytest.mark.usefixtures("setup")
class TestSessionTokenHelpers:
    """Test for session token helpers"""

    created_db: DatabaseProxy = None
    client: cosmos_client.CosmosClient = None
    host = test_config.TestConfig.host
    masterKey = test_config.TestConfig.masterKey
    configs = test_config.TestConfig
    TEST_DATABASE_ID = configs.TEST_DATABASE_ID
    TEST_COLLECTION_ID = configs.TEST_SINGLE_PARTITION_CONTAINER_ID

    def test_get_session_token_update(self, setup):
        feed_range = FeedRangeInternalEpk(
            Range("AA", "BB", True, False)).to_dict()
        session_token = "0:1#54#3=50"
        feed_ranges_and_session_tokens = [(feed_range, session_token)]
        session_token = "0:1#51#3=52"
        feed_ranges_and_session_tokens.append((feed_range, session_token))
        session_token = setup[COLLECTION].get_latest_session_token(feed_ranges_and_session_tokens, feed_range)
        assert session_token == "0:1#54#3=52"

    def test_many_session_tokens_update_same_range(self, setup):
        feed_range = FeedRangeInternalEpk(
            Range("AA", "BB", True, False)).to_dict()
        feed_ranges_and_session_tokens = []
        for i in range(1000):
            session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100))
            feed_ranges_and_session_tokens.append((feed_range, session_token))
        session_token = "0:1#101#3=101"
        feed_ranges_and_session_tokens.append((feed_range, session_token))
        updated_session_token = setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens,
                                                                                     feed_range)
        assert updated_session_token == session_token

    def test_many_session_tokens_update(self, setup):
        feed_range = FeedRangeInternalEpk(
            Range("AA", "BB", True, False)).to_dict()
        feed_ranges_and_session_tokens = []
        for i in range(1000):
            session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100))
            feed_ranges_and_session_tokens.append((feed_range, session_token))

        # adding irrelevant feed ranges
        feed_range1 = FeedRangeInternalEpk(
            Range("CC", "FF", True, False)).to_dict()
        feed_range2 = FeedRangeInternalEpk(
            Range("00", "55", True, False)).to_dict()
        for i in range(1000):
            session_token = "0:1#" + str(random.randint(1, 100)) + "#3=" + str(random.randint(1, 100))
            if i % 2 == 0:
                feed_ranges_and_session_tokens.append((feed_range1, session_token))
            else:
                feed_ranges_and_session_tokens.append((feed_range2, session_token))
        session_token = "0:1#101#3=101"
        feed_ranges_and_session_tokens.append((feed_range, session_token))
        updated_session_token = setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens,
                                                                                     feed_range)
        assert updated_session_token == session_token

    @pytest.mark.parametrize("split_ranges, target_feed_range, expected_session_token", create_split_ranges())
    def test_simulated_splits_merges(self, setup, split_ranges, target_feed_range, expected_session_token):
        actual_split_ranges = []
        for feed_range, session_token in split_ranges:
            actual_split_ranges.append((FeedRangeInternalEpk(Range(feed_range[0], feed_range[1],
                                                True, False)).to_dict(), session_token))
        target_feed_range = FeedRangeInternalEpk(Range(target_feed_range[0], target_feed_range[1][1],
                                               True, False)).to_dict()
        updated_session_token = setup[COLLECTION].get_latest_session_token(actual_split_ranges, target_feed_range)
        assert updated_session_token == expected_session_token

    def test_invalid_feed_range(self, setup):
        feed_range = FeedRangeInternalEpk(
            Range("AA", "BB", True, False)).to_dict()
        session_token = "0:1#54#3=50"
        feed_ranges_and_session_tokens = [(feed_range, session_token)]
        with pytest.raises(ValueError, match='There were no overlapping feed ranges with the target.'):
            setup["created_collection"].get_latest_session_token(feed_ranges_and_session_tokens,
                                                                 FeedRangeInternalEpk(Range(
                                                                      "CC",
                                                                      "FF",
                                                                      True,
                                                                      False)).to_dict())

if __name__ == '__main__':
    unittest.main()