File: test_buffered_sender_async.py

package info (click to toggle)
python-azure 20201208%2Bgit-6
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,437,920 kB
  • sloc: python: 4,287,452; javascript: 269; makefile: 198; sh: 187; xml: 106
file content (128 lines) | stat: -rw-r--r-- 5,844 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
try:
    from unittest import mock
except ImportError:
    import mock

from azure.search.documents.aio import (
    SearchIndexingBufferedSender,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.search.documents.models import IndexingResult

CREDENTIAL = AzureKeyCredential(key="test_api_key")

class TestSearchBatchingClientAsync(object):
    async def test_search_indexing_buffered_sender_kwargs(self):
        async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, window=100) as client:
            assert client._batch_action_count == 512
            assert client._max_retries == 3
            assert client._auto_flush_interval == 60
            assert client._auto_flush


    async def test_batch_queue(self):
        async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, auto_flush=False) as client:
            assert client._index_documents_batch
            await client.upload_documents(["upload1"])
            await client.delete_documents(["delete1", "delete2"])
            await client.merge_documents(["merge1", "merge2", "merge3"])
            await client.merge_or_upload_documents(["merge_or_upload1"])
            assert len(client.actions) == 7
            actions = await client._index_documents_batch.dequeue_actions()
            assert len(client.actions) == 0
            await client._index_documents_batch.enqueue_actions(actions)
            assert len(client.actions) == 7


    @mock.patch(
        "azure.search.documents._internal.aio._search_indexing_buffered_sender_async.SearchIndexingBufferedSender._process_if_needed"
    )
    async def test_process_if_needed(self, mock_process_if_needed):
        async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL) as client:
            await client.upload_documents(["upload1"])
            await client.delete_documents(["delete1", "delete2"])
        assert mock_process_if_needed.called


    @mock.patch(
        "azure.search.documents._internal.aio._search_indexing_buffered_sender_async.SearchIndexingBufferedSender._cleanup"
    )
    async def test_context_manager(self, mock_cleanup):
        async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, auto_flush=False) as client:
            await client.upload_documents(["upload1"])
            await client.delete_documents(["delete1", "delete2"])
        assert mock_cleanup.called

    async def test_flush(self):
        DOCUMENT = {
            'Category': 'Hotel',
            'HotelId': '1000',
            'Rating': 4.0,
            'Rooms': [],
            'HotelName': 'Azure Inn',
        }
        with mock.patch.object(SearchIndexingBufferedSender, "_index_documents_actions", side_effect=HttpResponseError("Error")):
            async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, auto_flush=False) as client:
                client._index_key = "HotelId"
                await client.upload_documents([DOCUMENT])
                await client.flush()
                assert len(client.actions) == 0

    async def test_callback_new(self):
        on_new = mock.AsyncMock()
        async with SearchIndexingBufferedSender("endpoint", "index name", CREDENTIAL, auto_flush=False, on_new=on_new) as client:
            await client.upload_documents(["upload1"])
            assert on_new.called

    async def test_callback_error(self):
        async def mock_fail_index_documents(actions, timeout=86400):
            if len(actions) > 0:
                print("There is something wrong")
                result = IndexingResult()
                result.key = actions[0].additional_properties.get('id')
                result.status_code = 400
                result.succeeded = False
                self.uploaded = self.uploaded + len(actions) - 1
                return [result]

        on_error = mock.AsyncMock()
        async with SearchIndexingBufferedSender("endpoint",
                                               "index name",
                                               CREDENTIAL,
                                               auto_flush=False,
                                               on_error=on_error) as client:
            client._index_documents_actions = mock_fail_index_documents
            client._index_key = "id"
            await client.upload_documents({"id": 0})
            await client.flush()
            assert on_error.called

    async def test_callback_progress(self):
        async def mock_successful_index_documents(actions, timeout=86400):
            if len(actions) > 0:
                print("There is something wrong")
                result = IndexingResult()
                result.key = actions[0].additional_properties.get('id')
                result.status_code = 200
                result.succeeded = True
                return [result]

        on_progress = mock.AsyncMock()
        on_remove = mock.AsyncMock()
        async with SearchIndexingBufferedSender("endpoint",
                                               "index name",
                                               CREDENTIAL,
                                               auto_flush=False,
                                               on_progress=on_progress,
                                               on_remove=on_remove) as client:
            client._index_documents_actions = mock_successful_index_documents
            client._index_key = "id"
            await client.upload_documents({"id": 0})
            await client.flush()
            assert on_progress.called
            assert on_remove.called