1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import pytest
from azure.core import MatchConditions
from azure.core.exceptions import HttpResponseError
from devtools_testutils import AzureRecordedTestCase, recorded_by_proxy, get_credential
from search_service_preparer import SearchEnvVarPreparer, search_decorator
from azure.search.documents.indexes.models import (
SearchIndexerDataSourceConnection,
SearchIndexerDataContainer,
)
from azure.search.documents.indexes import SearchIndexerClient
class TestSearchClientDataSources(AzureRecordedTestCase):
def _create_data_source_connection(self, cs, name):
container = SearchIndexerDataContainer(name="searchcontainer")
data_source_connection = SearchIndexerDataSourceConnection(
name=name, type="azureblob", connection_string=cs, container=container
)
return data_source_connection
@SearchEnvVarPreparer()
@search_decorator(schema="hotel_schema.json", index_batch="hotel_small.json")
@recorded_by_proxy
def test_data_source(self, endpoint, **kwargs):
storage_cs = kwargs.get("search_storage_connection_string")
client = SearchIndexerClient(endpoint, get_credential(), retry_backoff_factor=60)
self._test_create_datasource(client, storage_cs)
self._test_delete_datasource(client, storage_cs)
self._test_get_datasource(client, storage_cs)
self._test_list_datasources(client, storage_cs)
self._test_create_or_update_datasource(client, storage_cs)
self._test_create_or_update_datasource_if_unchanged(client, storage_cs)
self._test_delete_datasource_if_unchanged(client, storage_cs)
self._test_delete_datasource_string_if_unchanged(client, storage_cs)
def _test_create_datasource(self, client, storage_cs):
ds_name = "create"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
result = client.create_data_source_connection(data_source_connection)
assert result.name == ds_name
assert result.type == "azureblob"
def _test_delete_datasource(self, client, storage_cs):
ds_name = "delete"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
client.create_data_source_connection(data_source_connection)
expected_count = len(client.get_data_source_connections()) - 1
client.delete_data_source_connection(ds_name)
assert len(client.get_data_source_connections()) == expected_count
def _test_get_datasource(self, client, storage_cs):
ds_name = "get"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
client.create_data_source_connection(data_source_connection)
result = client.get_data_source_connection(ds_name)
assert result.name == ds_name
def _test_list_datasources(self, client, storage_cs):
data_source_connection1 = self._create_data_source_connection(storage_cs, "list")
data_source_connection2 = self._create_data_source_connection(storage_cs, "list2")
client.create_data_source_connection(data_source_connection1)
client.create_data_source_connection(data_source_connection2)
result = client.get_data_source_connections()
assert isinstance(result, list)
assert set(x.name for x in result).intersection(set(["list", "list2"])) == set(["list", "list2"])
def _test_create_or_update_datasource(self, client, storage_cs):
ds_name = "cou"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
client.create_data_source_connection(data_source_connection)
expected_count = len(client.get_data_source_connections())
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
assert len(client.get_data_source_connections()) == expected_count
result = client.get_data_source_connection(ds_name)
assert result.name == ds_name
assert result.description == "updated"
def _test_create_or_update_datasource_if_unchanged(self, client, storage_cs):
ds_name = "couunch"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
data_source_connection.description = "changed"
with pytest.raises(HttpResponseError):
client.create_or_update_data_source_connection(
data_source_connection, match_condition=MatchConditions.IfNotModified
)
def _test_delete_datasource_if_unchanged(self, client, storage_cs):
ds_name = "delunch"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
with pytest.raises(HttpResponseError):
client.delete_data_source_connection(data_source_connection, match_condition=MatchConditions.IfNotModified)
def _test_delete_datasource_string_if_unchanged(self, client, storage_cs):
ds_name = "delstrunch"
data_source_connection = self._create_data_source_connection(storage_cs, ds_name)
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
with pytest.raises(ValueError):
client.delete_data_source_connection(
data_source_connection.name,
match_condition=MatchConditions.IfNotModified,
)
|