File: test_search_client_buffered_sender_live.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (168 lines) | stat: -rw-r--r-- 6,815 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

import pytest
import time
from azure.core.exceptions import HttpResponseError
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchIndexingBufferedSender, SearchClient
from devtools_testutils import AzureRecordedTestCase, recorded_by_proxy, get_credential
from search_service_preparer import SearchEnvVarPreparer, search_decorator

TIME_TO_SLEEP = 3


class TestSearchIndexingBufferedSender(AzureRecordedTestCase):
    @SearchEnvVarPreparer()
    @search_decorator(schema="hotel_schema.json", index_batch="hotel_small.json")
    @recorded_by_proxy
    def test_search_client_index_buffered_sender(self, endpoint, index_name):
        client = SearchClient(endpoint, index_name, get_credential(), retry_backoff_factor=60)
        batch_client = SearchIndexingBufferedSender(endpoint, index_name, get_credential(), retry_backoff_factor=60)
        try:
            doc_count = 10
            doc_count = self._test_upload_documents_new(client, batch_client, doc_count)
            doc_count = self._test_upload_documents_existing(client, batch_client, doc_count)
            doc_count = self._test_delete_documents_existing(client, batch_client, doc_count)
            doc_count = self._test_delete_documents_missing(client, batch_client, doc_count)
            doc_count = self._test_merge_documents_existing(client, batch_client, doc_count)
            doc_count = self._test_merge_documents_missing(client, batch_client, doc_count)
            doc_count = self._test_merge_or_upload_documents(client, batch_client, doc_count)
        finally:
            batch_client.close()

    def _test_upload_documents_new(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        docs = [
            {"hotelId": "1000", "rating": 5, "rooms": [], "hotelName": "Azure Inn"},
            {"hotelId": "1001", "rating": 4, "rooms": [], "hotelName": "Redmond Hotel"},
        ]
        batch_client.upload_documents(docs)
        doc_count += 2

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count
        for doc in docs:
            result = client.get_document(key=doc["hotelId"])
            assert result["hotelId"] == doc["hotelId"]
            assert result["hotelName"] == doc["hotelName"]
            assert result["rating"] == doc["rating"]
            assert result["rooms"] == doc["rooms"]
        return doc_count

    def _test_upload_documents_existing(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        # add one new and one existing
        docs = [
            {"hotelId": "1002", "rating": 5, "rooms": [], "hotelName": "Azure Inn"},
            {"hotelId": "3", "rating": 4, "rooms": [], "hotelName": "Redmond Hotel"},
        ]
        batch_client.upload_documents(docs)
        doc_count += 1

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count
        return doc_count

    def _test_delete_documents_existing(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        docs = [{"hotelId": "3"}, {"hotelId": "4"}]
        batch_client.delete_documents(docs)
        doc_count -= 2

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count

        with pytest.raises(HttpResponseError):
            client.get_document(key="3")

        with pytest.raises(HttpResponseError):
            client.get_document(key="4")
        return doc_count

    def _test_delete_documents_missing(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        # delete one existing and one missing
        docs = [{"hotelId": "1003"}, {"hotelId": "2"}]
        batch_client.delete_documents(docs)
        doc_count -= 1

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count
        with pytest.raises(HttpResponseError):
            client.get_document(key="1003")
        with pytest.raises(HttpResponseError):
            client.get_document(key="2")
        return doc_count

    def _test_merge_documents_existing(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        docs = [{"hotelId": "5", "rating": 1}, {"hotelId": "6", "rating": 2}]
        batch_client.merge_documents(docs)

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count

        result = client.get_document(key="5")
        assert result["rating"] == 1

        result = client.get_document(key="6")
        assert result["rating"] == 2
        return doc_count

    def _test_merge_documents_missing(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        # merge to one existing and one missing document
        docs = [{"hotelId": "1003", "rating": 1}, {"hotelId": "1", "rating": 2}]
        batch_client.merge_documents(docs)

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count

        with pytest.raises(HttpResponseError):
            client.get_document(key="1003")

        result = client.get_document(key="1")
        assert result["rating"] == 2
        return doc_count

    def _test_merge_or_upload_documents(self, client, batch_client, doc_count):
        batch_client._batch_action_count = 2
        # merge to one existing and one missing
        docs = [{"hotelId": "1003", "rating": 1}, {"hotelId": "1", "rating": 2}]
        batch_client.merge_or_upload_documents(docs)
        doc_count += 1

        # There can be some lag before a document is searchable
        if self.is_live:
            time.sleep(TIME_TO_SLEEP)

        assert client.get_document_count() == doc_count

        result = client.get_document(key="1003")
        assert result["rating"] == 1

        result = client.get_document(key="1")
        assert result["rating"] == 2
        return doc_count