File: sample_memory_advanced_async.py

package info (click to toggle)
python-azure 20251118%2Bgit-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 783,356 kB
  • sloc: python: 6,474,533; ansic: 804; javascript: 287; sh: 205; makefile: 198; xml: 109
file content (169 lines) | stat: -rw-r--r-- 7,620 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# pylint: disable=line-too-long,useless-suppression
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------

"""
DESCRIPTION:
    This sample demonstrates how to interact with the memory store to add and retrieve memory
    using the asynchronous AIProjectClient. It uses some additional operations compared
    to the basic memory sample.

    See also /samples/agents/tools/sample_agent_memory_search_async.py that shows
    how to use the Memory Search Tool in a prompt agent.

USAGE:
    python sample_memory_advanced_async.py

    Before running the sample:

    pip install "azure-ai-projects>=2.0.0b1" azure-identity openai python-dotenv aiohttp

    Deploy a chat model (e.g. gpt-4.1) and an embedding model (e.g. text-embedding-3-small).
    Once you have deployed models, set the deployment name in the variables below.

    Set these environment variables with your own values:
    1) AZURE_AI_PROJECT_ENDPOINT - The Azure AI Project endpoint, as found in the Overview
       page of your Microsoft Foundry portal.
    2) AZURE_AI_CHAT_MODEL_DEPLOYMENT_NAME - The deployment name of the chat model, as found under the "Name" column in
       the "Models + endpoints" tab in your Microsoft Foundry project.
    3) AZURE_AI_EMBEDDING_MODEL_DEPLOYMENT_NAME - The deployment name of the embedding model, as found under the
       "Name" column in the "Models + endpoints" tab in your Microsoft Foundry project.
"""

import asyncio
import os
from dotenv import load_dotenv
from azure.core.exceptions import ResourceNotFoundError
from azure.identity.aio import DefaultAzureCredential
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import (
    MemoryStoreDefaultDefinition,
    MemoryStoreDefaultOptions,
    MemorySearchOptions,
    ResponsesUserMessageItemParam,
    ResponsesAssistantMessageItemParam,
)

load_dotenv()


async def main() -> None:

    endpoint = os.environ["AZURE_AI_PROJECT_ENDPOINT"]

    async with (
        DefaultAzureCredential() as credential,
        AIProjectClient(endpoint=endpoint, credential=credential) as project_client,
    ):

        # Delete memory store, if it already exists
        memory_store_name = "my_memory_store"
        try:
            await project_client.memory_stores.delete(memory_store_name)
            print(f"Memory store `{memory_store_name}` deleted")
        except ResourceNotFoundError:
            pass

        # Create memory store with advanced options
        options = MemoryStoreDefaultOptions(
            user_profile_enabled=True,
            user_profile_details="Preferences and interests relevant to coffee expert agent",
            chat_summary_enabled=True,
        )
        definition = MemoryStoreDefaultDefinition(
            chat_model=os.environ["AZURE_AI_CHAT_MODEL_DEPLOYMENT_NAME"],
            embedding_model=os.environ["AZURE_AI_EMBEDDING_MODEL_DEPLOYMENT_NAME"],
            options=options,
        )
        memory_store = await project_client.memory_stores.create(
            name=memory_store_name,
            description="Example memory store for conversations",
            definition=definition,
        )
        print(f"Created memory store: {memory_store.name} ({memory_store.id}): {memory_store.description}")

        # Set scope to associate the memories with.
        # You can also use "{{$userId}}" to take the oid of the request authentication header.
        scope = "user_123"

        # Extract memories from messages and add them to the memory store
        user_message = ResponsesUserMessageItemParam(
            content="I prefer dark roast coffee and usually drink it in the morning"
        )
        update_poller = await project_client.memory_stores.begin_update_memories(
            name=memory_store.name,
            scope=scope,
            items=[user_message],  # Pass conversation items that you want to add to memory
            update_delay=300  # Keep default inactivity delay before starting update
        )
        print(
            f"Scheduled memory update operation (Update ID: {update_poller.update_id}, Status: {update_poller.status()})"
        )

        # Extend the previous update with another update and more messages
        new_message = ResponsesUserMessageItemParam(content="I also like cappuccinos in the afternoon")
        new_update_poller = await project_client.memory_stores.begin_update_memories(
            name=memory_store.name,
            scope=scope,
            items=[new_message],
            previous_update_id=update_poller.update_id,  # Extend from previous update ID
            update_delay=0,  # Trigger update immediately without waiting for inactivity
        )
        print(
            f"Scheduled memory update operation (Update ID: {new_update_poller.update_id}, Status: {new_update_poller.status()})"
        )

        # As first update has not started yet, the new update will cancel the first update and cover both sets of messages
        print(
            f"Superseded first memory update operation (Update ID: {update_poller.update_id}, Status: {update_poller.status()})"
        )

        new_update_result = await new_update_poller.result()
        print(
            f"Second update {new_update_poller.update_id} completed with {len(new_update_result.memory_operations)} memory operations"
        )
        for operation in new_update_result.memory_operations:
            print(
                f"  - Operation: {operation.kind}, Memory ID: {operation.memory_item.memory_id}, Content: {operation.memory_item.content}"
            )

        # Retrieve memories from the memory store
        query_message = ResponsesUserMessageItemParam(content="What are my morning coffee preferences?")
        search_response = await project_client.memory_stores.search_memories(
            name=memory_store.name, scope=scope, items=[query_message], options=MemorySearchOptions(max_memories=5)
        )
        print(f"Found {len(search_response.memories)} memories")
        for memory in search_response.memories:
            print(f"  - Memory ID: {memory.memory_item.memory_id}, Content: {memory.memory_item.content}")

        # Perform another search using the previous search as context
        agent_message = ResponsesAssistantMessageItemParam(
            content="You previously indicated a preference for dark roast coffee in the morning."
        )
        followup_query = ResponsesUserMessageItemParam(
            content="What about afternoon?"  # Follow-up assuming context from previous messages
        )
        followup_search_response = await project_client.memory_stores.search_memories(
            name=memory_store.name,
            scope=scope,
            items=[agent_message, followup_query],
            previous_search_id=search_response.search_id,
            options=MemorySearchOptions(max_memories=5),
        )
        print(f"Found {len(followup_search_response.memories)} memories")
        for memory in followup_search_response.memories:
            print(f"  - Memory ID: {memory.memory_item.memory_id}, Content: {memory.memory_item.content}")

        # Delete memories for the current scope
        await project_client.memory_stores.delete_scope(name=memory_store.name, scope=scope)
        print(f"Deleted memories for scope '{scope}'")

        # Delete memory store
        await project_client.memory_stores.delete(memory_store.name)
        print(f"Deleted memory store `{memory_store.name}`")


if __name__ == "__main__":
    asyncio.run(main())