File: test_async_iter.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (137 lines) | stat: -rw-r--r-- 4,522 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import pytest
from pypy.module.cpyext.test.test_cpyext import AppTestCpythonExtensionBase
from pypy.conftest import option


@pytest.mark.skip("too slow, over 30 seconds for this one test")
class AppTestAsyncIter(AppTestCpythonExtensionBase):
    enable_leak_checking = True

    def test_asyncgen(self):
        """ module is this code after running through cython
            async def test_gen():
                a = yield 123
                assert a is None
                yield 456
                yield 789

            def run_until_complete(coro):
                while True:
                    try:
                        fut = coro.send(None)
                    except StopIteration as ex:
                        return ex.args[0]

            def to_list(gen):
                async def iterate():
                    res = []
                    async for i in gen:
                        res.append(i)
                    return res

                return run_until_complete(iterate())
            """
        module = self.import_module(name='test_asyncgen')
        result = module.to_list(module.test_gen())
        assert result == [123, 456, 789]


    def test_async_gen_exception_04(self):
        """module is this code after running through cython, then making some
           small adjustments (see https://github.com/cython/cython/pull/5429)
            ZERO = 0

            async def gen():
                yield 123
                1 / ZERO

            def test_last_yield(g):
                ai = g.__aiter__()
                an = ai.__anext__()
                try:
                    next(an) 
                except StopIteration as ex:
                    return ex.args
                else:
                    return None 
             
        """
        module = self.import_module(name='test_async_gen_exception_04')
        g = module.gen()
        result = module.test_last_yield(g)
        assert result == 123 


class AppTestCoroReturn(AppTestCpythonExtensionBase):
    enable_leak_checking = True

    def test_coro_retval(self):
        """
        # Check that the final result of a coroutine is available in the StopIteration
        # that should be raised by the final call to its tp_iternext method
        body = '''
        static PyObject *value_from_stopiteration(void)
        {
            PyObject *ptype, *pvalue, *ptraceback, *return_value;
            PyErr_Fetch(&ptype, &pvalue, &ptraceback);
            if (PyErr_GivenExceptionMatches(pvalue, PyExc_StopIteration)) {
                return_value = PyObject_GetAttrString(pvalue, "value");
                Py_XDECREF(pvalue);
            }
            else {
                return_value = pvalue;
            }
            Py_XDECREF(ptype);
            Py_XDECREF(ptraceback);
            return return_value;
        }

        static PyObject *exhaust_coro(PyObject *self, PyObject *args)
        {
            PyObject *coro;
            if (!PyArg_ParseTuple(args, "O", &coro)) {
                return NULL;
            }
            PyObject *coro_wrapper = PyObject_CallMethod(coro, "__await__", NULL);
            if (coro_wrapper == NULL) {
                return NULL;
            }
            PyObject *result;
            iternextfunc next = Py_TYPE(coro_wrapper)->tp_iternext;
            while (1) {
                PyObject *value = next(coro_wrapper);
                if (value) {
                    continue;
                }
                else if (!PyErr_Occurred()) {
                    PyErr_SetString(PyExc_AssertionError, "coroutine finished but there was no exception raised");
                    return NULL;
                }
                else if (!PyErr_ExceptionMatches(PyExc_StopIteration)) {
                    result = NULL;
                }
                else {
                    result = value_from_stopiteration();
                }
                break;
            }

            Py_DECREF(coro_wrapper);
            return result;
        }

        static PyMethodDef methods[] = {
            {"exhaust_coro", exhaust_coro, METH_VARARGS},
            {NULL}
        };

        static struct PyModuleDef moduledef = {
            PyModuleDef_HEAD_INIT, "test_coro_retval", NULL, -1, methods
        };
        '''
        test_coro_retval = self.import_module(name='test_coro_retval', body=body)

        async def test_coro():
            return "hi coro"
        assert test_coro_retval.exhaust_coro(test_coro()) == "hi coro"
        """