File: sample_indexers_operations_async.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (132 lines) | stat: -rw-r--r-- 4,321 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# coding: utf-8

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

"""
FILE: sample_indexer_operations_async.py
DESCRIPTION:
    This sample demonstrates how to get, create, update, or delete a Indexer.
USAGE:
    python sample_indexer_operations_async.py

    Set the environment variables with your own values before running the sample:
    1) AZURE_SEARCH_SERVICE_ENDPOINT - the endpoint of your Azure Cognitive Search service
    2) AZURE_SEARCH_API_KEY - your search API key
"""

import asyncio
import os

service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
key = os.environ["AZURE_SEARCH_API_KEY"]
connection_string = os.environ["AZURE_STORAGE_CONNECTION_STRING"]

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes.models import (
    SearchIndexerDataContainer,
    SearchIndexerDataSourceConnection,
    SearchIndex,
    SearchIndexer,
    SimpleField,
    SearchFieldDataType,
)
from azure.search.documents.indexes.aio import SearchIndexerClient, SearchIndexClient

indexers_client = SearchIndexerClient(service_endpoint, AzureKeyCredential(key))


async def create_indexer():
    # create an index
    index_name = "async-indexer-hotels"
    fields = [
        SimpleField(name="hotelId", type=SearchFieldDataType.String, key=True),
        SimpleField(name="baseRate", type=SearchFieldDataType.Double),
    ]
    index = SearchIndex(name=index_name, fields=fields)
    ind_client = SearchIndexClient(service_endpoint, AzureKeyCredential(key))
    async with ind_client:
        await ind_client.create_index(index)

    # [START create_indexer_async]
    # create a datasource
    container = SearchIndexerDataContainer(name="searchcontainer")
    data_source_connection = SearchIndexerDataSourceConnection(
        name="async-indexer-datasource", type="azureblob", connection_string=connection_string, container=container
    )
    data_source = await indexers_client.create_data_source_connection(data_source_connection)

    # create an indexer
    indexer = SearchIndexer(
        name="async-sample-indexer",
        data_source_name="async-indexer-datasource",
        target_index_name="async-indexer-hotels",
    )
    result = await indexers_client.create_indexer(indexer)
    print("Create new Indexer - async-sample-indexer")
    # [END create_indexer_async]


async def list_indexers():
    # [START list_indexer_async]
    result = await indexers_client.get_indexers()
    names = [x.name for x in result]
    print("Found {} Indexers in the service: {}".format(len(result), ", ".join(names)))
    # [END list_indexer_async]


async def get_indexer():
    # [START get_indexer_async]
    result = await indexers_client.get_indexer("async-sample-indexer")
    print("Retrived Indexer 'async-sample-indexer'")
    return result
    # [END get_indexer_async]


async def get_indexer_status():
    # [START get_indexer_status_async]
    result = await indexers_client.get_indexer_status("async-sample-indexer")
    print("Retrived Indexer status for 'async-sample-indexer'")
    return result
    # [END get_indexer_status_async]


async def run_indexer():
    # [START run_indexer_async]
    await indexers_client.run_indexer("async-sample-indexer")
    print("Ran the Indexer 'async-sample-indexer'")
    return
    # [END run_indexer_async]


async def reset_indexer():
    # [START reset_indexer_async]
    await indexers_client.reset_indexer("async-sample-indexer")
    print("Reset the Indexer 'async-sample-indexer'")
    return
    # [END reset_indexer_async]


async def delete_indexer():
    # [START delete_indexer_async]
    await indexers_client.delete_indexer("async-sample-indexer")
    print("Indexer 'async-sample-indexer' successfully deleted")
    # [END delete_indexer_async]


async def main():
    await create_indexer()
    await list_indexers()
    await get_indexer()
    await get_indexer_status()
    await run_indexer()
    await reset_indexer()
    await delete_indexer()
    await indexers_client.close()


if __name__ == "__main__":
    asyncio.run(main())