File: sample_indexer_datasource_skillset.py

package info (click to toggle)
python-azure 20250603%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 851,724 kB
  • sloc: python: 7,362,925; ansic: 804; javascript: 287; makefile: 195; sh: 145; xml: 109
file content (152 lines) | stat: -rw-r--r-- 6,103 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
FILE: sample_indexer_datasource_skillset.py
DESCRIPTION:
    This sample demonstrates use an indexer, datasource and skillset together.

    Indexer is used to efficiently write data to an index using a datasource.
    So we first identify a supported data source - we use azure storage blobs
    in this example. Then we create an index which is compatible with the datasource.
    Further, we create an azure cognitive search datasource which we require to finally
    create an indexer.

    Additionally, we will also use skillsets to provide some AI enhancements in our indexers.

    Once we create the indexer, we run the indexer and perform some basic operations like getting
    the indexer status.

    The datasource used in this sample is stored as metadata for empty blobs in "searchcontainer".
    The json file can be found in samples/files folder named hotel_small.json has the metdata of
    each blob.
USAGE:
    python sample_indexer_datasource_skillset.py

    Set the environment variables with your own values before running the sample:
    1) AZURE_SEARCH_SERVICE_ENDPOINT - the endpoint of your Azure Cognitive Search service
    2) AZURE_SEARCH_API_KEY - your search API key
    3) AZURE_STORAGE_CONNECTION_STRING - The connection string for the storage blob account that is
    being used to create the datasource.
"""

import os
import datetime

service_endpoint = os.environ["AZURE_SEARCH_SERVICE_ENDPOINT"]
key = os.environ["AZURE_SEARCH_API_KEY"]
connection_string = os.environ["AZURE_STORAGE_CONNECTION_STRING"]

from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes.models import (
    SearchIndexerDataContainer,
    SearchIndex,
    SearchIndexer,
    SimpleField,
    SearchFieldDataType,
    EntityRecognitionSkill,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    SearchIndexerSkillset,
    CorsOptions,
    IndexingSchedule,
    SearchableField,
    IndexingParameters,
    SearchIndexerDataSourceConnection,
    IndexingParametersConfiguration,
)
from azure.search.documents.indexes import SearchIndexerClient, SearchIndexClient


def _create_index():
    name = "hotel-index"

    # Here we create an index with listed fields.
    fields = [
        SimpleField(name="hotelId", type=SearchFieldDataType.String, filterable=True, sortable=True, key=True),
        SearchableField(name="hotelName", type=SearchFieldDataType.String),
        SimpleField(name="description", type=SearchFieldDataType.String),
        SimpleField(name="descriptionFr", type=SearchFieldDataType.String),
        SimpleField(name="category", type=SearchFieldDataType.String),
        SimpleField(name="parkingIncluded", type=SearchFieldDataType.Boolean, filterable=True),
        SimpleField(name="smokingAllowed", type=SearchFieldDataType.Boolean, filterable=True),
        SimpleField(name="lastRenovationDate", type=SearchFieldDataType.String),
        SimpleField(name="rating", type=SearchFieldDataType.Int64, sortable=True),
        SimpleField(name="location", type=SearchFieldDataType.GeographyPoint),
    ]
    cors_options = CorsOptions(allowed_origins=["*"], max_age_in_seconds=60)

    # pass in the name, fields and cors options and create the index
    index = SearchIndex(name=name, fields=fields, cors_options=cors_options)
    index_client = SearchIndexClient(service_endpoint, AzureKeyCredential(key))
    result = index_client.create_index(index)
    return result


def _create_datasource():
    # Here we create a datasource. As mentioned in the description we have stored it in
    # "searchcontainer"
    ds_client = SearchIndexerClient(service_endpoint, AzureKeyCredential(key))
    container = SearchIndexerDataContainer(name="searchcontainer")
    data_source_connection = SearchIndexerDataSourceConnection(
        name="hotel-datasource", type="azureblob", connection_string=connection_string, container=container
    )
    data_source = ds_client.create_data_source_connection(data_source_connection)
    return data_source


def _create_skillset():
    client = SearchIndexerClient(service_endpoint, AzureKeyCredential(key))
    inp = InputFieldMappingEntry(name="text", source="/document/lastRenovationDate")
    output = OutputFieldMappingEntry(name="dateTimes", target_name="RenovatedDate")
    s = EntityRecognitionSkill(name="merge-skill", inputs=[inp], outputs=[output])

    skillset = SearchIndexerSkillset(name="hotel-data-skill", skills=[s], description="example skillset")
    result = client.create_skillset(skillset)
    return result


def sample_indexer_workflow():
    # Now that we have a datasource and an index, we can create an indexer.

    skillset_name = _create_skillset().name
    print("Skillset is created")

    ds_name = _create_datasource().name
    print("Data source is created")

    ind_name = _create_index().name
    print("Index is created")

    # we pass the data source, skillsets and targeted index to build an indexer
    configuration = IndexingParametersConfiguration(parsing_mode="jsonArray", query_timeout=None)  # type: ignore
    parameters = IndexingParameters(configuration=configuration)
    indexer = SearchIndexer(
        name="hotel-data-indexer",
        data_source_name=ds_name,
        target_index_name=ind_name,
        skillset_name=skillset_name,
        parameters=parameters,
    )

    indexer_client = SearchIndexerClient(service_endpoint, AzureKeyCredential(key))
    indexer_client.create_indexer(indexer)  # create the indexer

    # to get an indexer
    result = indexer_client.get_indexer("hotel-data-indexer")
    print(result)

    # To run an indexer, we can use run_indexer()
    indexer_client.run_indexer(result.name)

    # Using create or update to schedule an indexer

    schedule = IndexingSchedule(interval=datetime.timedelta(hours=24))
    result.schedule = schedule
    updated_indexer = indexer_client.create_or_update_indexer(result)

    print(updated_indexer)

    # get the status of an indexer
    indexer_client.get_indexer_status(updated_indexer.name)


if __name__ == "__main__":
    sample_indexer_workflow()