1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
import asyncio
import random as random_module
from typing import AsyncIterable, AsyncIterator
from aiostream import operator, pipable_operator, pipe, streamcontext
@operator
async def random(
offset: float = 0.0, width: float = 1.0, interval: float = 0.1
) -> AsyncIterator[float]:
"""Generate a stream of random numbers."""
while True:
await asyncio.sleep(interval)
yield offset + width * random_module.random()
@pipable_operator
async def power(
source: AsyncIterable[float], exponent: float | int
) -> AsyncIterator[float]:
"""Raise the elements of an asynchronous sequence to the given power."""
async with streamcontext(source) as streamer:
async for item in streamer:
yield item**exponent
@pipable_operator
def square(source: AsyncIterable[float]) -> AsyncIterator[float]:
"""Square the elements of an asynchronous sequence."""
return power.raw(source, 2)
async def main() -> None:
xs = (
random() # Stream random numbers
| square.pipe() # Square the values
| pipe.take(5) # Take the first five
| pipe.accumulate()
) # Sum the values
print(await xs)
# Run main coroutine
asyncio.run(main())
|