File: async_def.pyx

package info (click to toggle)
cython 3.0.11%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 19,092 kB
  • sloc: python: 83,539; ansic: 18,831; cpp: 1,402; xml: 1,031; javascript: 511; makefile: 403; sh: 204; sed: 11
file content (106 lines) | stat: -rw-r--r-- 2,762 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# cython: language_level=3str, binding=True
# mode: run
# tag: pep492, await, gh3337

"""
Cython specific tests in addition to "test_coroutines_pep492.pyx"
(which is copied from CPython).
"""

import sys


def run_async(coro, assert_type=True, send_value=None):
    if assert_type:
        #assert coro.__class__ is types.GeneratorType
        assert coro.__class__.__name__ in ('coroutine', '_GeneratorWrapper'), coro.__class__.__name__

    buffer = []
    result = None
    while True:
        try:
            buffer.append(coro.send(send_value))
        except StopIteration as ex:
            result = ex.value if sys.version_info >= (3, 5) else ex.args[0] if ex.args else None
            break
    return buffer, result


async def test_async_temp_gh3337(x, y):
    """
    >>> run_async(test_async_temp_gh3337(2, 3))
    ([], -1)
    >>> run_async(test_async_temp_gh3337(3, 2))
    ([], 0)
    """
    return min(x - y, 0)


async def outer_with_nested(called):
    """
    >>> called = []
    >>> _, inner = run_async(outer_with_nested(called))
    >>> called  # after outer_with_nested()
    ['outer', 'make inner', 'deco', 'return inner']
    >>> _ = run_async(inner())
    >>> called  # after inner()
    ['outer', 'make inner', 'deco', 'return inner', 'inner']
    """
    called.append('outer')

    def deco(f):
        called.append('deco')
        return f

    called.append('make inner')

    @deco
    async def inner():
        called.append('inner')
        return 1

    called.append('return inner')
    return inner

# used in "await_in_genexpr_iterator"
async def h(arg):
    return [arg, arg+1]

async def await_in_genexpr_iterator():
    """
    >>> _, x = run_async(await_in_genexpr_iterator())
    >>> x
    [4, 6]
    """
    lst = list  # obfuscate from any optimizations cython might try
    return lst(x*2 for x in await h(2))

def yield_in_genexpr_iterator():
    """
    Same test as await_in_genexpr_iterator but with yield.
    (Possibly in the wrong place, but grouped with related tests)

    >>> g = yield_in_genexpr_iterator()
    >>> g.send(None)
    >>> _, x = run_async(g, assert_type=False, send_value=[2, 3])
    >>> x
    [4, 6]
    """
    lst = list  # obfuscate from any optimizations cython might try
    return lst(x*2 for x in (yield))

def h_yield_from(arg):
    yield
    return [arg, arg+1]

def yield_from_in_genexpr_iterator():
    """
    Same test as await_in_genexpr_iterator but with "yield from".
    (Possibly in the wrong place, but grouped with related tests)

    >>> _, x = run_async(yield_from_in_genexpr_iterator(), assert_type=False)
    >>> x
    [4, 6]
    """
    lst = list  # obfuscate from any optimizations cython might try
    return lst(x*2 for x in (yield from h_yield_from(2)))