File: sample_agents_stream_file_search.py

package info (click to toggle)
python-azure 20250829%2Bgit-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 756,824 kB
  • sloc: python: 6,224,989; ansic: 804; javascript: 287; makefile: 198; sh: 195; xml: 109
file content (182 lines) | stat: -rw-r--r-- 7,874 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

"""
DESCRIPTION:
    This sample demonstrates how to use agent operations with file searching from
    the Azure Agents service using a synchronous client.

USAGE:
    python sample_agents_stream_file_search.py

    Before running the sample:

    pip install azure-ai-projects azure-ai-agents azure-identity

    Set these environment variables with your own values:
    1) PROJECT_ENDPOINT - The Azure AI Project endpoint, as found in the Overview
                          page of your Azure AI Foundry portal.
    2) MODEL_DEPLOYMENT_NAME - The deployment name of the AI model, as found under the "Name" column in
       the "Models + endpoints" tab in your Azure AI Foundry project.
"""

import os
from azure.ai.projects import AIProjectClient
from azure.ai.agents.models import (
    AgentStreamEvent,
    FilePurpose,
    FileSearchTool,
    ListSortOrder,
    MessageDeltaChunk,
    MessageDeltaTextContent,
    MessageDeltaTextFileCitationAnnotation,
    MessageDeltaTextFileCitationAnnotationObject,
    RunAdditionalFieldList,
    RunStep,
    RunStepDeltaChunk,
    RunStepFileSearchToolCall,
    RunStepToolCallDetails,
    ThreadMessage,
    ThreadRun,
)
from azure.identity import DefaultAzureCredential

asset_file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../assets/product_info_1.md"))

project_client = AIProjectClient(
    endpoint=os.environ["PROJECT_ENDPOINT"],
    credential=DefaultAzureCredential(),
)

with project_client:
    agents_client = project_client.agents

    # Upload file and create vector store
    file = agents_client.files.upload_and_poll(file_path=asset_file_path, purpose=FilePurpose.AGENTS)
    print(f"Uploaded file, file ID: {file.id}")

    vector_store = agents_client.vector_stores.create_and_poll(file_ids=[file.id], name="my_vectorstore")
    print(f"Created vector store, vector store ID: {vector_store.id}")

    # Create file search tool with resources followed by creating agent
    file_search = FileSearchTool(vector_store_ids=[vector_store.id])

    agent = agents_client.create_agent(
        model=os.environ["MODEL_DEPLOYMENT_NAME"],
        name="my-agent",
        instructions="Hello, you are helpful agent and can search information from uploaded files",
        tools=file_search.definitions,
        tool_resources=file_search.resources,
    )

    print(f"Created agent, ID: {agent.id}")

    # Create thread for communication
    thread = agents_client.threads.create()
    print(f"Created thread, ID: {thread.id}")

    # Create message to thread
    message = agents_client.messages.create(
        thread_id=thread.id, role="user", content="Hello, what Contoso products do you know?"
    )
    print(f"Created message, ID: {message.id}")

    references = {}
    with agents_client.runs.stream(
        thread_id=thread.id, agent_id=agent.id, include=[RunAdditionalFieldList.FILE_SEARCH_CONTENTS]
    ) as stream:

        for event_type, event_data, _ in stream:
            if isinstance(event_data, MessageDeltaChunk):
                text = event_data.text
                if event_data.delta.content and isinstance(event_data.delta.content[0], MessageDeltaTextContent):
                    delta_text_content = event_data.delta.content[0]
                    if delta_text_content.text and delta_text_content.text.annotations:
                        for delta_annotation in delta_text_content.text.annotations:
                            if (
                                isinstance(delta_annotation, MessageDeltaTextFileCitationAnnotation)
                                and isinstance(
                                    delta_annotation.file_citation, MessageDeltaTextFileCitationAnnotationObject
                                )
                                and delta_annotation.file_citation.file_id
                                and delta_annotation.text
                            ):
                                citation = (
                                    os.path.split(asset_file_path)[-1]
                                    if delta_annotation.file_citation.file_id == file.id
                                    else delta_annotation.file_citation.file_id
                                )
                                references[delta_annotation.text] = citation
                                print(f"File citation delta received: [{citation}]")
                for ref, citation in references.items():
                    text = text.replace(ref, f" [{citation}]")
                print(f"Text delta received: {text}")

            elif isinstance(event_data, RunStepDeltaChunk):
                print(f"RunStepDeltaChunk received. ID: {event_data.id}.")

            elif isinstance(event_data, ThreadMessage):
                print(f"ThreadMessage created. ID: {event_data.id}, Status: {event_data.status}")

            elif isinstance(event_data, ThreadRun):
                print(f"ThreadRun status: {event_data.status}")

                if event_data.status == "failed":
                    print(f"Run failed. Error: {event_data.last_error}")

            elif isinstance(event_data, RunStep):
                print(f"RunStep type: {event_data.type}, Status: {event_data.status}")
                if isinstance(event_data.step_details, RunStepToolCallDetails):
                    for tool_call in event_data.step_details.tool_calls:
                        if (
                            isinstance(tool_call, RunStepFileSearchToolCall)
                            and tool_call.file_search
                            and tool_call.file_search.results
                            and tool_call.file_search.results[0].content
                            and tool_call.file_search.results[0].content[0].text
                        ):
                            print(
                                "The search tool has found the next relevant content in "
                                f"the file {tool_call.file_search.results[0].file_name}:"
                            )
                            # Note: technically we may have several search results, however in our example
                            # we only have one file, so we are taking the only result.
                            print(tool_call.file_search.results[0].content[0].text)
                            print("===============================================================")

            elif event_type == AgentStreamEvent.ERROR:
                print(f"An error occurred. Data: {event_data}")

            elif event_type == AgentStreamEvent.DONE:
                print("Stream completed.")

            else:
                print(f"Unhandled Event Type: {event_type}, Data: {event_data}")

    # Delete the file when done
    agents_client.vector_stores.delete(vector_store.id)
    print("Deleted vector store")

    agents_client.files.delete(file_id=file.id)
    print("Deleted file")

    # Delete the agent when done
    agents_client.delete_agent(agent.id)
    print("Deleted agent")

    # Fetch and log all messages
    messages = agents_client.messages.list(thread_id=thread.id, order=ListSortOrder.ASCENDING)

    # Print last messages from the thread
    file_name = os.path.split(asset_file_path)[-1]
    for msg in messages:
        if msg.text_messages:
            last_text = msg.text_messages[-1].text.value
            for annotation in msg.text_messages[-1].text.annotations:
                citation = (
                    file_name if annotation.file_citation.file_id == file.id else annotation.file_citation.file_id
                )
                last_text = last_text.replace(annotation.text, f" [{citation}]")
            print(f"{msg.role}: {last_text}")