1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
|
import pytest
from pypy.module.cpyext.test.test_cpyext import AppTestCpythonExtensionBase
from pypy.conftest import option
@pytest.mark.skip("too slow, over 30 seconds for this one test")
class AppTestAsyncIter(AppTestCpythonExtensionBase):
enable_leak_checking = True
def test_asyncgen(self):
""" module is this code after running through cython
async def test_gen():
a = yield 123
assert a is None
yield 456
yield 789
def run_until_complete(coro):
while True:
try:
fut = coro.send(None)
except StopIteration as ex:
return ex.args[0]
def to_list(gen):
async def iterate():
res = []
async for i in gen:
res.append(i)
return res
return run_until_complete(iterate())
"""
module = self.import_module(name='test_asyncgen')
result = module.to_list(module.test_gen())
assert result == [123, 456, 789]
def test_async_gen_exception_04(self):
"""module is this code after running through cython, then making some
small adjustments (see https://github.com/cython/cython/pull/5429)
ZERO = 0
async def gen():
yield 123
1 / ZERO
def test_last_yield(g):
ai = g.__aiter__()
an = ai.__anext__()
try:
next(an)
except StopIteration as ex:
return ex.args
else:
return None
"""
module = self.import_module(name='test_async_gen_exception_04')
g = module.gen()
result = module.test_last_yield(g)
assert result == 123
class AppTestCoroReturn(AppTestCpythonExtensionBase):
enable_leak_checking = True
def test_coro_retval(self):
"""
# Check that the final result of a coroutine is available in the StopIteration
# that should be raised by the final call to its tp_iternext method
body = '''
static PyObject *value_from_stopiteration(void)
{
PyObject *ptype, *pvalue, *ptraceback, *return_value;
PyErr_Fetch(&ptype, &pvalue, &ptraceback);
if (PyErr_GivenExceptionMatches(pvalue, PyExc_StopIteration)) {
return_value = PyObject_GetAttrString(pvalue, "value");
Py_XDECREF(pvalue);
}
else {
return_value = pvalue;
}
Py_XDECREF(ptype);
Py_XDECREF(ptraceback);
return return_value;
}
static PyObject *exhaust_coro(PyObject *self, PyObject *args)
{
PyObject *coro;
if (!PyArg_ParseTuple(args, "O", &coro)) {
return NULL;
}
PyObject *coro_wrapper = PyObject_CallMethod(coro, "__await__", NULL);
if (coro_wrapper == NULL) {
return NULL;
}
PyObject *result;
iternextfunc next = Py_TYPE(coro_wrapper)->tp_iternext;
while (1) {
PyObject *value = next(coro_wrapper);
if (value) {
continue;
}
else if (!PyErr_Occurred()) {
PyErr_SetString(PyExc_AssertionError, "coroutine finished but there was no exception raised");
return NULL;
}
else if (!PyErr_ExceptionMatches(PyExc_StopIteration)) {
result = NULL;
}
else {
result = value_from_stopiteration();
}
break;
}
Py_DECREF(coro_wrapper);
return result;
}
static PyMethodDef methods[] = {
{"exhaust_coro", exhaust_coro, METH_VARARGS},
{NULL}
};
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT, "test_coro_retval", NULL, -1, methods
};
'''
test_coro_retval = self.import_module(name='test_coro_retval', body=body)
async def test_coro():
return "hi coro"
assert test_coro_retval.exhaust_coro(test_coro()) == "hi coro"
"""
|