File: test_utils_asyncgen.py

package info (click to toggle)
python-scrapy 2.14.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,316 kB
  • sloc: python: 55,421; xml: 199; makefile: 25; sh: 7
file content (16 lines) | stat: -rw-r--r-- 552 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from scrapy.utils.asyncgen import as_async_generator, collect_asyncgen
from scrapy.utils.defer import deferred_f_from_coro_f


class TestAsyncgenUtils:
    @deferred_f_from_coro_f
    async def test_as_async_generator(self):
        ag = as_async_generator(range(42))
        results = [i async for i in ag]
        assert results == list(range(42))

    @deferred_f_from_coro_f
    async def test_collect_asyncgen(self):
        ag = as_async_generator(range(42))
        results = await collect_asyncgen(ag)
        assert results == list(range(42))