1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
"""Aggregation operators."""
from __future__ import annotations
import asyncio
import builtins
import operator as op
from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
from . import select
from ..aiter_utils import anext
from ..core import pipable_operator, streamcontext
__all__ = ["accumulate", "reduce", "list"]
T = TypeVar("T")
@pipable_operator
async def accumulate(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T] = op.add,
initializer: T | None = None,
) -> AsyncIterator[T]:
"""Generate a series of accumulated sums (or other binary function)
from an asynchronous sequence.
If ``initializer`` is present, it is placed before the items
of the sequence in the calculation, and serves as a default
when the sequence is empty.
"""
iscorofunc = asyncio.iscoroutinefunction(func)
async with streamcontext(source) as streamer:
# Initialize
if initializer is None:
try:
value = await anext(streamer)
except StopAsyncIteration:
return
else:
value = initializer
# First value
yield value
# Iterate streamer
async for item in streamer:
returned = func(value, item)
if iscorofunc:
awaitable_value = cast("Awaitable[T]", returned)
value = await awaitable_value
else:
value = cast("T", returned)
yield value
@pipable_operator
def reduce(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T],
initializer: T | None = None,
) -> AsyncIterator[T]:
"""Apply a function of two arguments cumulatively to the items
of an asynchronous sequence, reducing the sequence to a single value.
If ``initializer`` is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
"""
acc = accumulate.raw(source, func, initializer)
return select.item.raw(acc, -1)
@pipable_operator
async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
"""Build a list from an asynchronous sequence.
All the intermediate steps are generated, starting from the empty list.
This operator can be used to easily convert a stream into a list::
lst = await stream.list(x)
..note:: The same list object is produced at each step in order to avoid
memory copies.
"""
result: builtins.list[T] = []
yield result
async with streamcontext(source) as streamer:
async for item in streamer:
result.append(item)
yield result
|