1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
"""Transformation operators."""
from __future__ import annotations
import asyncio
import itertools
from typing import (
Protocol,
TypeVar,
AsyncIterable,
AsyncIterator,
Awaitable,
cast,
)
from ..core import streamcontext, pipable_operator
from . import select
from . import create
from . import aggregate
from .combine import map, amap, smap
__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
# map, amap and smap are also transform operators
map, amap, smap
T = TypeVar("T")
U = TypeVar("U")
@pipable_operator
async def enumerate(
source: AsyncIterable[T], start: int = 0, step: int = 1
) -> AsyncIterator[tuple[int, T]]:
"""Generate ``(index, value)`` tuples from an asynchronous sequence.
This index is computed using a starting point and an increment,
respectively defaulting to ``0`` and ``1``.
"""
count = itertools.count(start, step)
async with streamcontext(source) as streamer:
async for item in streamer:
yield next(count), item
X = TypeVar("X", contravariant=True)
Y = TypeVar("Y", covariant=True)
class AsyncStarmapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
...
class SyncStarmapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Y:
...
@pipable_operator
def starmap(
source: AsyncIterable[tuple[T, ...]],
func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
ordered: bool = True,
task_limit: int | None = None,
) -> AsyncIterator[U]:
"""Apply a given function to the unpacked elements of
an asynchronous sequence.
Each element is unpacked before applying the function.
The given function can either be synchronous or asynchronous.
The results can either be returned in or out of order, depending on
the corresponding ``ordered`` argument. This argument is ignored if
the provided function is synchronous.
The coroutines run concurrently but their amount can be limited using
the ``task_limit`` argument. A value of ``1`` will cause the coroutines
to run sequentially. This argument is ignored if the provided function
is synchronous.
"""
if asyncio.iscoroutinefunction(func):
async_func = cast("AsyncStarmapCallable[T, U]", func)
async def astarfunc(args: tuple[T, ...], *_: object) -> U:
awaitable = async_func(*args)
return await awaitable
return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
else:
sync_func = cast("SyncStarmapCallable[T, U]", func)
def starfunc(args: tuple[T, ...], *_: object) -> U:
return sync_func(*args)
return smap.raw(source, starfunc)
@pipable_operator
async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
"""Iterate indefinitely over an asynchronous sequence.
Note: it does not perform any buffering, but re-iterate over
the same given sequence instead. If the sequence is not
re-iterable, the generator might end up looping indefinitely
without yielding any item.
"""
while True:
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
# Prevent blocking while loop if the stream is empty
await asyncio.sleep(0)
@pipable_operator
async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
"""Generate chunks of size ``n`` from an asynchronous sequence.
The chunks are lists, and the last chunk might contain less than ``n``
elements.
"""
async with streamcontext(source) as streamer:
async for first in streamer:
xs = select.take(create.preserve(streamer), n - 1)
yield [first] + await aggregate.list(xs)
|