File: test_session_token_unit.py

package info (click to toggle)
python-azure 20251014%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 766,472 kB
  • sloc: python: 6,314,744; ansic: 804; javascript: 287; makefile: 198; sh: 198; xml: 109
file content (173 lines) | stat: -rw-r--r-- 8,410 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

import unittest
import os

import pytest

from azure.cosmos._vector_session_token import VectorSessionToken
from azure.cosmos.exceptions import CosmosHttpResponseError


@pytest.mark.cosmosEmulator
class TestSessionTokenUnitTest(unittest.TestCase):
    """Test to ensure escaping of non-ascii characters from partition key"""

    def test_validate_successful_session_token_parsing(self):
        # valid session token
        session_token = "1#100#1=20#2=5#3=30"
        self.assertEqual(VectorSessionToken.create(session_token).convert_to_string(), "1#100#1=20#2=5#3=30")

    def test_validate_session_token_parsing_with_invalid_version(self):
        session_token = "foo#100#1=20#2=5#3=30"
        self.assertIsNone(VectorSessionToken.create(session_token))

    def test_validate_session_token_parsing_with_invalid_global_lsn(self):
        session_token = "1#foo#1=20#2=5#3=30"
        self.assertIsNone(VectorSessionToken.create(session_token))

    def test_validate_session_token_parsing_with_invalid_region_progress(self):
        session_token = "1#100#1=20#2=x#3=30"
        self.assertIsNone(VectorSessionToken.create(session_token))

    def test_validate_session_token_parsing_with_invalid_format(self):
        session_token = "1;100#1=20#2=40"
        self.assertIsNone(VectorSessionToken.create(session_token))

    def test_validate_session_token_parsing_from_empty_string(self):
        session_token = ""
        self.assertIsNone(VectorSessionToken.create(session_token))

    def _different_merge_scenarios(self):
        # valid session token
        session_token1 = VectorSessionToken.create("1#100#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("2#105#4=10#2=5#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)
        self.assertFalse(session_token1.equals(session_token2))
        self.assertFalse(session_token2.equals(session_token1))

        session_token_merged = VectorSessionToken.create("2#105#2=5#3=30#4=10")
        self.assertIsNotNone(session_token_merged)
        self.assertTrue(session_token1.merge(session_token2).equals(session_token_merged))

        session_token1 = VectorSessionToken.create("1#100#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("1#100#1=10#2=8#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)
        self.assertFalse(session_token1.equals(session_token2))
        self.assertFalse(session_token2.equals(session_token1))

        session_token_merged = VectorSessionToken.create("1#100#1=20#2=8#3=30")
        self.assertIsNotNone(session_token_merged)
        self.assertTrue(session_token_merged.equals(session_token1.merge(session_token2)))

        session_token1 = VectorSessionToken.create("1#100#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("1#102#1=100#2=8#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token1)
        self.assertFalse(session_token1.equals(session_token2))
        self.assertFalse(session_token2.equals(session_token1))

        session_token_merged = VectorSessionToken.create("1#102#2=8#3=30#1=100")
        self.assertIsNotNone(session_token_merged)
        self.assertTrue(session_token_merged.equals(session_token1.merge(session_token2)))

        # same vector clock version with global lsn increase (no failover)
        session_token1 = VectorSessionToken.create("1#100#1=20#2=5")
        session_token2 = VectorSessionToken.create("1#197#1=20#2=5")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)

        self.assertTrue(session_token1.merge(session_token2).equals(
            VectorSessionToken.create("1#197#1=20#2=5")))

        # same vector clock version with global lsn increase and local lsn increase
        session_token1 = VectorSessionToken.create("1#100#1=20#2=5")
        session_token2 = VectorSessionToken.create("1#197#1=23#2=15")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)

        self.assertTrue(session_token1.merge(session_token2).equals(
            VectorSessionToken.create("1#197#1=23#2=15")))

        # different number of regions with same region should throw error
        session_token1 = VectorSessionToken.create("1#101#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("1#100#1=20#2=5#3=30#4=40")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)
        try:
            session_token1.merge(session_token2)
            self.fail("Region progress can not be different when version is same")
        except CosmosHttpResponseError as e:
            self.assertEqual(str(e),
                             "Status code: 500\nCompared session tokens '1#101#1=20#2=5#3=30' "
                             "and '1#100#1=20#2=5#3=30#4=40' have unexpected regions.")

        # same version with different region progress should throw error
        session_token1 = VectorSessionToken.create("1#101#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("1#100#4=20#2=5#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)

        try:
            session_token1.merge(session_token2)
            self.fail("Region progress can not be different when version is same")
        except CosmosHttpResponseError as e:
            self.assertEqual(str(e),
                             "Status code: 500\nCompared session tokens '1#101#1=20#2=5#3=30' "
                             "and '1#100#4=20#2=5#3=30' have unexpected regions.")

    def test_validate_session_token_comparison(self):
        self._different_merge_scenarios()
        os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"] = "false"
        self._different_merge_scenarios()
        del os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"]

    def test_session_token_false_progress_merge(self):
        for false_progress_enabled in [True, False]:
            self.validate_different_session_token_false_progress_merge_scenarios(false_progress_enabled)

    def validate_different_session_token_false_progress_merge_scenarios(self, false_progress_enabled: bool):
        # Test that false progress merge is enabled by default and that global lsn is used from higher version token
        # when enabled
        os.environ["AZURE_COSMOS_SESSION_TOKEN_FALSE_PROGRESS_MERGE"] = str(false_progress_enabled)
        session_token1 = VectorSessionToken.create("1#200#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("2#100#1=10#2=8#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)
        if false_progress_enabled:
            expected_session_token = "2#100#1=20#2=8#3=30"
        else:
            expected_session_token = "2#200#1=20#2=8#3=30"
        self.assertTrue(session_token1.merge(session_token2).equals(
            VectorSessionToken.create(expected_session_token)))

        # vector clock version increase with removed region progress should merge
        session_token1 = VectorSessionToken.create("1#200#1=20#2=5#3=30")
        session_token2 = VectorSessionToken.create("2#100#1=10#2=5")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)

        if false_progress_enabled:
            expected_session_token = "2#100#1=20#2=5"
        else:
            expected_session_token = "2#200#1=20#2=5"
        self.assertTrue(session_token1.merge(session_token2).equals(
            VectorSessionToken.create(expected_session_token)))

        # vector clock version increase with new region progress should merge
        session_token1 = VectorSessionToken.create("1#200#1=20#2=5")
        session_token2 = VectorSessionToken.create("2#100#1=10#2=5#3=30")
        self.assertIsNotNone(session_token1)
        self.assertIsNotNone(session_token2)
        if false_progress_enabled:
            expected_session_token = "2#100#1=20#2=5#3=30"
        else:
            expected_session_token = "2#200#1=20#2=5#3=30"
        self.assertTrue(session_token1.merge(session_token2).equals(
            VectorSessionToken.create(expected_session_token)))

if __name__ == '__main__':
    unittest.main()