File: coroutines.py

package info (click to toggle)
cython 3.0.11%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 19,092 kB
  • sloc: python: 83,539; ansic: 18,831; cpp: 1,402; xml: 1,031; javascript: 511; makefile: 403; sh: 204; sed: 11
file content (60 lines) | stat: -rw-r--r-- 1,375 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# cython: language_level=3
# mode: run
# tag: pep492, pure3.5, gh1462, async, await


async def test_coroutine_frame(awaitable):
    """
    >>> class Awaitable(object):
    ...     def __await__(self):
    ...         return iter([2])

    >>> coro = test_coroutine_frame(Awaitable())
    >>> import types
    >>> isinstance(coro.cr_frame, types.FrameType) or coro.cr_frame
    True
    >>> coro.cr_frame is coro.cr_frame  # assert that it's cached
    True
    >>> coro.cr_frame.f_code is not None
    True
    >>> code_obj = coro.cr_frame.f_code
    >>> code_obj.co_argcount
    1
    >>> code_obj.co_varnames
    ('awaitable', 'b')

    >>> next(coro.__await__())  # avoid "not awaited" warning
    2
    """
    b = await awaitable
    return b


# gh1462: Using decorators on coroutines.

def pass_through(func):
    return func


@pass_through
async def test_pass_through():
    """
    >>> t = test_pass_through()
    >>> try: t.send(None)
    ... except StopIteration as ex:
    ...     print(ex.args[0] if ex.args else None)
    ... else: print("NOT STOPPED!")
    None
    """


@pass_through(pass_through)
async def test_pass_through_with_args():
    """
    >>> t = test_pass_through_with_args()
    >>> try: t.send(None)
    ... except StopIteration as ex:
    ...     print(ex.args[0] if ex.args else None)
    ... else: print("NOT STOPPED!")
    None
    """