File: test_time.py

package info (click to toggle)
python-aiostream 0.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 416 kB
  • sloc: python: 2,800; makefile: 5
file content (18 lines) | stat: -rw-r--r-- 499 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pytest
import asyncio

from aiostream import stream, pipe


@pytest.mark.asyncio
async def test_timeout(assert_run, assert_cleanup):
    with assert_cleanup() as loop:
        xs = stream.range(3) | pipe.timeout(5)
        await assert_run(xs, [0, 1, 2])
        assert loop.steps == []

    with assert_cleanup():
        xs = stream.range(3) + stream.never()
        ys = xs | pipe.timeout(1)
        await assert_run(ys, [0, 1, 2], asyncio.TimeoutError())
        assert loop.steps == [1]