1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
|
import asyncio
from asyncio import Future
from typing import Any, Coroutine, List, TypeVar
import reactivex
from reactivex import Notification, Observable
from reactivex import operators as ops
from reactivex.scheduler.eventloop import AsyncIOScheduler
_T = TypeVar("_T")
def to_async_generator(sentinel: Any = None) -> Coroutine[Any, Any, Future[Any]]:
loop = asyncio.get_event_loop()
future = loop.create_future()
notifications: List[Notification[Any]] = []
def _to_async_generator(source: Observable[_T]):
def feeder():
nonlocal future
if not notifications or future.done():
return
notification = notifications.pop(0)
if notification.kind == "E":
future.set_exception(notification.exception)
elif notification.kind == "C":
future.set_result(sentinel)
else:
future.set_result(notification.value)
def on_next(value: _T) -> None:
"""Takes on_next values and appends them to the notification queue"""
notifications.append(value)
loop.call_soon(feeder)
source.pipe(ops.materialize()).subscribe(on_next)
async def gen():
"""Generator producing futures"""
nonlocal future
loop.call_soon(feeder)
future = Future()
return future
return gen
return _to_async_generator
async def go(loop):
scheduler = AsyncIOScheduler(loop)
xs = reactivex.from_([x for x in range(10)], scheduler=scheduler)
gen = xs.pipe(to_async_generator())
# Wish we could write something like:
# ys = (x for x in yield from gen())
while True:
x = await gen()
if x is None:
break
print(x)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))
if __name__ == "__main__":
main()
|