File: test_utils_asyncgen.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (18 lines) | stat: -rw-r--r-- 607 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from twisted.trial import unittest

from scrapy.utils.asyncgen import as_async_generator, collect_asyncgen
from scrapy.utils.defer import deferred_f_from_coro_f


class TestAsyncgenUtils(unittest.TestCase):
    @deferred_f_from_coro_f
    async def test_as_async_generator(self):
        ag = as_async_generator(range(42))
        results = [i async for i in ag]
        assert results == list(range(42))

    @deferred_f_from_coro_f
    async def test_collect_asyncgen(self):
        ag = as_async_generator(range(42))
        results = await collect_asyncgen(ag)
        assert results == list(range(42))