File: query_tests.py

package info (click to toggle)
azure-cosmos-python 3.1.1-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,280 kB
  • sloc: python: 11,653; makefile: 155
file content (194 lines) | stat: -rw-r--r-- 9,355 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import unittest
import uuid
import pytest
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.retry_utility as retry_utility
import test.test_config as test_config

@pytest.mark.usefixtures("teardown")
class QueryTest(unittest.TestCase):
    """Test to ensure escaping of non-ascii characters from partition key"""

    config = test_config._test_config
    host = config.host
    masterKey = config.masterKey
    connectionPolicy = config.connectionPolicy
    client = cosmos_client.CosmosClient(host, {'masterKey': masterKey}, connectionPolicy)
    created_db = config.create_database_if_not_exist(client)

    def test_first_and_last_slashes_trimmed_for_query_string (self):
        created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
        document_definition = {'pk': 'pk', 'id':'myId'}
        self.client.CreateItem(created_collection['_self'], document_definition)

        query_options = {'partitionKey': 'pk'}
        collectionLink = '/dbs/' + self.created_db['id'] + '/colls/' + created_collection['id'] + '/'
        query = 'SELECT * from c'
        query_iterable = self.client.QueryItems(collectionLink, query, query_options)

        iter_list = list(query_iterable)
        self.assertEqual(iter_list[0]['id'], 'myId')

    def test_query_change_feed(self):
        created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
        collection_link = created_collection['_self']
        # The test targets partition #3
        pkRangeId = "3"

        # Read change feed with passing options
        query_iterable = self.client.QueryItemsChangeFeed(collection_link)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 0)

        # Read change feed without specifying partition key range ID
        options = {}
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 0)

        # Read change feed from current should return an empty list
        options['partitionKeyRangeId'] = pkRangeId
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 0)
        self.assertTrue('etag' in self.client.last_response_headers)
        self.assertNotEquals(self.client.last_response_headers['etag'], '')

        # Read change feed from beginning should return an empty list
        options['isStartFromBeginning'] = True
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 0)
        self.assertTrue('etag' in self.client.last_response_headers)
        continuation1 = self.client.last_response_headers['etag']
        self.assertNotEquals(continuation1, '')

        # Create a document. Read change feed should return be able to read that document
        document_definition = {'pk': 'pk', 'id':'doc1'}
        self.client.CreateItem(collection_link, document_definition)
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 1)
        self.assertEqual(iter_list[0]['id'], 'doc1')
        self.assertTrue('etag' in self.client.last_response_headers)
        continuation2 = self.client.last_response_headers['etag']
        self.assertNotEquals(continuation2, '')
        self.assertNotEquals(continuation2, continuation1)

        # Create two new documents. Verify that change feed contains the 2 new documents
        # with page size 1 and page size 100
        document_definition = {'pk': 'pk', 'id':'doc2'}
        self.client.CreateItem(collection_link, document_definition)
        document_definition = {'pk': 'pk', 'id':'doc3'}
        self.client.CreateItem(collection_link, document_definition)
        options['isStartFromBeginning'] = False
        
        for pageSize in [1, 100]:
            # verify iterator
            options['continuation'] = continuation2
            options['maxItemCount'] = pageSize
            query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
            it = query_iterable.__iter__()
            expected_ids = 'doc2.doc3.'
            actual_ids = ''
            for item in it:
                actual_ids += item['id'] + '.'    
            self.assertEqual(actual_ids, expected_ids)

            # verify fetch_next_block
            # the options is not copied, therefore it need to be restored
            options['continuation'] = continuation2
            query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
            count = 0
            expected_count = 2
            all_fetched_res = []
            while (True):
                fetched_res = query_iterable.fetch_next_block()
                self.assertEquals(len(fetched_res), min(pageSize, expected_count - count))
                count += len(fetched_res)
                all_fetched_res.extend(fetched_res)
                if len(fetched_res) == 0:
                    break
            actual_ids = ''
            for item in all_fetched_res:
                actual_ids += item['id'] + '.'
            self.assertEqual(actual_ids, expected_ids)
            # verify there's no more results
            self.assertEquals(query_iterable.fetch_next_block(), [])

        # verify reading change feed from the beginning
        options['isStartFromBeginning'] = True
        options['continuation'] = None
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        expected_ids = ['doc1', 'doc2', 'doc3']
        it = query_iterable.__iter__()
        for i in range(0, len(expected_ids)):
            doc = next(it)
            self.assertEquals(doc['id'], expected_ids[i])
        self.assertTrue('etag' in self.client.last_response_headers)
        continuation3 = self.client.last_response_headers['etag']

        # verify reading empty change feed 
        options['continuation'] = continuation3
        query_iterable = self.client.QueryItemsChangeFeed(collection_link, options)
        iter_list = list(query_iterable)
        self.assertEqual(len(iter_list), 0)

    def test_populate_query_metrics (self):
        created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
        document_definition = {'pk': 'pk', 'id':'myId'}
        self.client.CreateItem(created_collection['_self'], document_definition)

        query_options = {'partitionKey': 'pk',
                         'populateQueryMetrics': True}
        query = 'SELECT * from c'
        query_iterable = self.client.QueryItems(created_collection['_self'], query, query_options)

        iter_list = list(query_iterable)
        self.assertEqual(iter_list[0]['id'], 'myId')

        METRICS_HEADER_NAME = 'x-ms-documentdb-query-metrics'
        self.assertTrue(METRICS_HEADER_NAME in self.client.last_response_headers)
        metrics_header = self.client.last_response_headers[METRICS_HEADER_NAME]
        # Validate header is well-formed: "key1=value1;key2=value2;etc"
        metrics = metrics_header.split(';')
        self.assertTrue(len(metrics) > 1)
        self.assertTrue(all(['=' in x for x in metrics]))

    def test_max_item_count_honored_in_order_by_query(self):
        created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
        docs = []
        for i in range(10):
            document_definition = {'pk': 'pk', 'id': 'myId' + str(uuid.uuid4())}
            docs.append(self.client.CreateItem(created_collection['_self'], document_definition))

        query = 'SELECT * from c ORDER BY c._ts'
        query_options = {'enableCrossPartitionQuery': True,
                         'maxItemCount': 1}
        query_iterable = self.client.QueryItems(created_collection['_self'], query, query_options)
        #1 call to get query plans, 1 call to get pkr, 10 calls to one partion with the documents, 1 call each to other 4 partitions
        self.validate_query_requests_count(query_iterable, 16 * 2)

        query_options['maxItemCount'] = 100
        query_iterable = self.client.QueryItems(created_collection['_self'], query, query_options)
        # 1 call to get query plan 1 calls to one partition with the documents, 1 call each to other 4 partitions
        self.validate_query_requests_count(query_iterable, 6 * 2)

    def validate_query_requests_count(self, query_iterable, expected_count):
        self.count = 0
        self.OriginalExecuteFunction = retry_utility._ExecuteFunction
        retry_utility._ExecuteFunction = self._MockExecuteFunction
        block = query_iterable.fetch_next_block()
        while block:
            block = query_iterable.fetch_next_block()
        retry_utility._ExecuteFunction = self.OriginalExecuteFunction
        self.assertEquals(self.count, expected_count)
        self.count = 0

    def _MockExecuteFunction(self, function, *args, **kwargs):
        self.count += 1
        return self.OriginalExecuteFunction(function, *args, **kwargs)


if __name__ == "__main__":
    unittest.main()