File: helpers.py

package info (click to toggle)
anthropic-sdk-python 0.75.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,252 kB
  • sloc: python: 29,737; sh: 177; makefile: 5
file content (28 lines) | stat: -rw-r--r-- 879 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from __future__ import annotations

import os
from typing import TypeVar, Iterator
from typing_extensions import AsyncIterator

_T = TypeVar("_T")


def load_fixture(fixture_name: str) -> str:
    """Load a fixture file from the fixtures directory."""
    current_dir = os.path.dirname(os.path.abspath(__file__))
    fixtures_dir = os.path.join(current_dir, "fixtures")
    with open(os.path.join(fixtures_dir, fixture_name), "r") as f:
        return f.read()


def get_response(fixture_name: str) -> Iterator[bytes]:
    """Convert a fixture file into a stream of bytes for testing."""
    content = load_fixture(fixture_name)
    for line in content.splitlines():
        yield line.encode() + b"\n"


async def to_async_iter(iter: Iterator[_T]) -> AsyncIterator[_T]:
    """Convert a synchronous iterator to an asynchronous one."""
    for event in iter:
        yield event