File: realtime.py

package info (click to toggle)
python-openai 2.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,528 kB
  • sloc: python: 68,384; sh: 161; makefile: 7
file content (54 lines) | stat: -rwxr-xr-x 1,902 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#!/usr/bin/env rye run python
import asyncio

from openai import AsyncOpenAI

# Azure OpenAI Realtime Docs

# How-to: https://learn.microsoft.com/azure/ai-services/openai/how-to/realtime-audio
# Supported models and API versions: https://learn.microsoft.com/azure/ai-services/openai/how-to/realtime-audio#supported-models
# Entra ID auth: https://learn.microsoft.com/azure/ai-services/openai/how-to/managed-identity


async def main() -> None:
    """The following example demonstrates how to configure OpenAI to use the Realtime API.
    For an audio example, see push_to_talk_app.py and update the client and model parameter accordingly.

    When prompted for user input, type a message and hit enter to send it to the model.
    Enter "q" to quit the conversation.
    """

    client = AsyncOpenAI()
    async with client.realtime.connect(
        model="gpt-realtime",
    ) as connection:
        await connection.session.update(
            session={
                "output_modalities": ["text"],
                "model": "gpt-realtime",
                "type": "realtime",
            }
        )
        while True:
            user_input = input("Enter a message: ")
            if user_input == "q":
                break

            await connection.conversation.item.create(
                item={
                    "type": "message",
                    "role": "user",
                    "content": [{"type": "input_text", "text": user_input}],
                }
            )
            await connection.response.create()
            async for event in connection:
                if event.type == "response.output_text.delta":
                    print(event.delta, flush=True, end="")
                elif event.type == "response.output_text.done":
                    print()
                elif event.type == "response.done":
                    break


asyncio.run(main())