1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
"""Time-specific operators."""
from __future__ import annotations
import asyncio
from ..aiter_utils import anext
from ..core import streamcontext, pipable_operator
from typing import TypeVar, AsyncIterable, AsyncIterator
__all__ = ["spaceout", "delay", "timeout"]
T = TypeVar("T")
@pipable_operator
async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
"""Make sure the elements of an asynchronous sequence are separated
in time by the given interval.
"""
timeout = 0.0
loop = asyncio.get_event_loop()
async with streamcontext(source) as streamer:
async for item in streamer:
delta = timeout - loop.time()
delay = delta if delta > 0 else 0.0
await asyncio.sleep(delay)
yield item
timeout = loop.time() + interval
@pipable_operator
async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
"""Raise a time-out if an element of the asynchronous sequence
takes too long to arrive.
Note: the timeout is not global but specific to each step of
the iteration.
"""
async with streamcontext(source) as streamer:
while True:
try:
item = await asyncio.wait_for(anext(streamer), timeout)
except StopAsyncIteration:
break
else:
yield item
@pipable_operator
async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
"""Delay the iteration of an asynchronous sequence."""
await asyncio.sleep(delay)
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
|