1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import json
from os.path import dirname, join, realpath
import pytest
from devtools_testutils import AzureMgmtTestCase
from search_service_preparer import SearchServicePreparer, SearchResourceGroupPreparer
from azure_devtools.scenario_tests import ReplayableTest
from azure.core import MatchConditions
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from azure.search.documents.indexes.models import(
SearchIndexerDataSourceConnection,
SearchIndexerDataContainer,
)
from azure.search.documents.indexes import SearchIndexerClient
CWD = dirname(realpath(__file__))
SCHEMA = open(join(CWD, "hotel_schema.json")).read()
try:
BATCH = json.load(open(join(CWD, "hotel_small.json")))
except UnicodeDecodeError:
BATCH = json.load(open(join(CWD, "hotel_small.json"), encoding='utf-8'))
TIME_TO_SLEEP = 5
CONNECTION_STRING = 'DefaultEndpointsProtocol=https;AccountName=storagename;AccountKey=NzhL3hKZbJBuJ2484dPTR+xF30kYaWSSCbs2BzLgVVI1woqeST/1IgqaLm6QAOTxtGvxctSNbIR/1hW8yH+bJg==;EndpointSuffix=core.windows.net'
class SearchDataSourcesClientTest(AzureMgmtTestCase):
FILTER_HEADERS = ReplayableTest.FILTER_HEADERS + ['api-key']
def _create_data_source_connection(self, name="sample-datasource"):
container = SearchIndexerDataContainer(name='searchcontainer')
data_source_connection = SearchIndexerDataSourceConnection(
name=name,
type="azureblob",
connection_string=CONNECTION_STRING,
container=container
)
return data_source_connection
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_create_datasource(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
result = client.create_data_source_connection(data_source_connection)
assert result.name == "sample-datasource"
assert result.type == "azureblob"
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_delete_datasource(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
result = client.create_data_source_connection(data_source_connection)
assert len(client.get_data_source_connections()) == 1
client.delete_data_source_connection("sample-datasource")
assert len(client.get_data_source_connections()) == 0
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_get_datasource(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
created = client.create_data_source_connection(data_source_connection)
result = client.get_data_source_connection("sample-datasource")
assert result.name == "sample-datasource"
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_list_datasource(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection1 = self._create_data_source_connection()
data_source_connection2 = self._create_data_source_connection(name="another-sample")
created1 = client.create_data_source_connection(data_source_connection1)
created2 = client.create_data_source_connection(data_source_connection2)
result = client.get_data_source_connections()
assert isinstance(result, list)
assert set(x.name for x in result) == {"sample-datasource", "another-sample"}
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_create_or_update_datasource(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
created = client.create_data_source_connection(data_source_connection)
assert len(client.get_data_source_connections()) == 1
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
assert len(client.get_data_source_connections()) == 1
result = client.get_data_source_connection("sample-datasource")
assert result.name == "sample-datasource"
assert result.description == "updated"
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_create_or_update_datasource_if_unchanged(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
data_source_connection.description = "changed"
with pytest.raises(HttpResponseError):
client.create_or_update_data_source_connection(data_source_connection, match_condition=MatchConditions.IfNotModified)
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_delete_datasource_if_unchanged(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
with pytest.raises(HttpResponseError):
client.delete_data_source_connection(data_source_connection, match_condition=MatchConditions.IfNotModified)
assert len(client.get_data_source_connections()) == 1
@SearchResourceGroupPreparer(random_name_enabled=True)
@SearchServicePreparer(schema=SCHEMA, index_batch=BATCH)
def test_delete_datasource_string_if_unchanged(self, api_key, endpoint, index_name, **kwargs):
client = SearchIndexerClient(endpoint, AzureKeyCredential(api_key))
data_source_connection = self._create_data_source_connection()
created = client.create_data_source_connection(data_source_connection)
etag = created.e_tag
# Now update the data source connection
data_source_connection.description = "updated"
client.create_or_update_data_source_connection(data_source_connection)
# prepare data source connection
data_source_connection.e_tag = etag # reset to the original data source connection
with pytest.raises(ValueError):
client.delete_data_source_connection(data_source_connection.name, match_condition=MatchConditions.IfNotModified)
|