1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
import pytest
import asyncio
from aiostream import stream, pipe
from aiostream.test_utils import assert_run, event_loop
# Pytest fixtures
assert_run, event_loop
@pytest.mark.asyncio
async def test_just(assert_run):
value = 3
xs = stream.just(value)
await assert_run(xs, [3])
async def four():
return 4
xs = stream.just(four())
await assert_run(xs, [4])
@pytest.mark.asyncio
async def test_call(assert_run):
def myfunc(a, b, c=0, d=4):
return a, b, c, d
xs = stream.call(myfunc, 1, 2, c=3)
await assert_run(xs, [(1, 2, 3, 4)])
async def myasyncfunc(a, b, c=0, d=4):
return a, b, c, d
xs = stream.call(myasyncfunc, 1, 2, c=3)
await assert_run(xs, [(1, 2, 3, 4)])
@pytest.mark.asyncio
async def test_throw(assert_run):
exception = RuntimeError("Oops")
xs = stream.throw(exception)
await assert_run(xs, [], exception)
@pytest.mark.asyncio
async def test_empty(assert_run):
xs = stream.empty()
await assert_run(xs, [])
@pytest.mark.asyncio
async def test_never(assert_run, event_loop):
xs = stream.never() | pipe.timeout(30.0)
await assert_run(xs, [], asyncio.TimeoutError())
assert event_loop.steps == [30.0]
@pytest.mark.asyncio
async def test_repeat(assert_run):
xs = stream.repeat(1, 3)
await assert_run(xs, [1, 1, 1])
xs = stream.repeat(2)[:4]
await assert_run(xs, [2, 2, 2, 2])
@pytest.mark.asyncio
async def test_range(assert_run, event_loop):
xs = stream.range(3, 10, 2, interval=1.0)
await assert_run(xs, [3, 5, 7, 9])
assert event_loop.steps == [1, 1, 1]
@pytest.mark.asyncio
async def test_count(assert_run):
xs = stream.count(3, 2)[:4]
await assert_run(xs, [3, 5, 7, 9])
@pytest.mark.asyncio
async def test_iterable(assert_run):
lst = [9, 4, 8, 3, 1]
xs = stream.create.from_iterable(lst)
await assert_run(xs, lst)
xs = stream.iterate(lst)
await assert_run(xs, lst)
@pytest.mark.asyncio
async def test_async_iterable(assert_run, event_loop):
async def agen():
for x in range(2, 5):
yield await asyncio.sleep(1.0, result=x**2)
xs = stream.create.from_async_iterable(agen())
await assert_run(xs, [4, 9, 16])
assert event_loop.steps == [1.0, 1.0, 1.0]
xs = stream.iterate(agen())
await assert_run(xs, [4, 9, 16])
assert event_loop.steps == [1.0, 1.0, 1.0] * 2
@pytest.mark.asyncio
async def test_non_iterable(assert_run):
with pytest.raises(TypeError):
stream.iterate(None)
with pytest.raises(TypeError):
stream.create.from_async_iterable(None)
@pytest.mark.asyncio
async def test_preserve(assert_run, event_loop):
async def agen():
yield 1
yield 2
xs = stream.iterate(agen())[0]
await assert_run(xs, [1])
await assert_run(xs, [], IndexError("Index out of range"))
ys = stream.preserve(agen())[0]
await assert_run(ys, [1])
await assert_run(ys, [2])
|