File: generators.py

package info (click to toggle)
python-utils 3.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 396 kB
  • sloc: python: 2,135; makefile: 19; sh: 5
file content (126 lines) | stat: -rw-r--r-- 3,808 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""
This module provides generator utilities for batching items from
iterables and async iterables.

Functions:
    abatcher(generator, batch_size=None, interval=None):
        Asyncio generator wrapper that returns items with a given batch
        size or interval (whichever is reached first).

    batcher(iterable, batch_size=10):
        Generator wrapper that returns items with a given batch size.
"""

import asyncio
import time

import python_utils
from python_utils import types

_T = types.TypeVar('_T')


async def abatcher(
    generator: types.Union[
        types.AsyncGenerator[_T, None],
        types.AsyncIterator[_T],
    ],
    batch_size: types.Optional[int] = None,
    interval: types.Optional[types.delta_type] = None,
) -> types.AsyncGenerator[types.List[_T], None]:
    """
    Asyncio generator wrapper that returns items with a given batch size or
    interval (whichever is reached first).

    Args:
        generator: The async generator or iterator to batch.
        batch_size (types.Optional[int], optional): The number of items per
            batch. Defaults to None.
        interval (types.Optional[types.delta_type], optional): The time
            interval to wait before yielding a batch. Defaults to None.

    Yields:
        types.AsyncGenerator[types.List[_T], None]: A generator that yields
        batches of items.
    """
    batch: types.List[_T] = []

    assert batch_size or interval, 'Must specify either batch_size or interval'

    # If interval is specified, use it to determine when to yield the batch
    # Alternatively set a really long timeout to keep the code simpler
    if interval:
        interval_s = python_utils.delta_to_seconds(interval)
    else:
        # Set the timeout to 10 years
        interval_s = 60 * 60 * 24 * 365 * 10.0

    next_yield: float = time.perf_counter() + interval_s

    done: types.Set[asyncio.Task[_T]]
    pending: types.Set[asyncio.Task[_T]] = set()

    while True:
        try:
            done, pending = await asyncio.wait(
                pending
                or [
                    asyncio.create_task(
                        types.cast(
                            types.Coroutine[None, None, _T],
                            generator.__anext__(),
                        )
                    ),
                ],
                timeout=interval_s,
                return_when=asyncio.FIRST_COMPLETED,
            )

            if done:
                for result in done:
                    batch.append(result.result())

        except StopAsyncIteration:
            if batch:
                yield batch

            break

        if batch_size is not None and len(batch) == batch_size:
            yield batch
            batch = []

        if interval and batch and time.perf_counter() > next_yield:
            yield batch
            batch = []
            # Always set the next yield time to the current time. If the
            # loop is running slow due to blocking functions we do not
            # want to burst too much
            next_yield = time.perf_counter() + interval_s


def batcher(
    iterable: types.Iterable[_T],
    batch_size: int = 10,
) -> types.Generator[types.List[_T], None, None]:
    """
    Generator wrapper that returns items with a given batch size.

    Args:
        iterable (types.Iterable[_T]): The iterable to batch.
        batch_size (int, optional): The number of items per batch. Defaults
            to 10.

    Yields:
        types.Generator[types.List[_T], None, None]: A generator that yields
        batches of items.
    """
    batch: types.List[_T] = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []

    if batch:
        yield batch