File: contextvars.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (72 lines) | stat: -rw-r--r-- 2,785 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# Implement the PyContextVar* functions in terms of the pure-python
# _contextvars module

from pypy.module.cpyext.api import (cpython_api, PyObject, PyObjectP,
    CONST_STRING)
from pypy.module.cpyext.pyobject import make_ref, from_ref
from rpython.rtyper.lltypesystem import rffi
from pypy.module.cpyext.pyobject import incref
from pypy.interpreter.error import OperationError

@cpython_api([CONST_STRING, PyObject], PyObject)
def PyContextVar_New(space, name, default):
    if name:
        w_str = space.newbytes(rffi.charp2str(name))
        w_name = space.call_method(w_str, 'decode', space.newtext("utf-8"))
    else:
        w_name = space.newtext('')
    if default:
        w_def = from_ref(space, default)
        return space.appexec([w_name, w_def], """(name, default):
            from _contextvars import ContextVar
            return ContextVar(name, default=default)
            """)
    else:
        return space.appexec([w_name], """(name,):
            from _contextvars import ContextVar
            return ContextVar(name)
            """)

@cpython_api([PyObject, PyObject], PyObject)
def PyContextVar_Set(space, w_ovar, w_val):
    return space.appexec([w_ovar, w_val], """(ovar, val):
        from _contextvars import ContextVar
        if not isinstance(ovar, ContextVar):
            raise TypeError('an instance of ContextVar was expected')
        return ovar.set(val)
        """)

@cpython_api([PyObject, PyObject], PyObject)
def PyContextVar_Reset(space, w_ovar, w_token):
    return space.appexec([w_ovar, w_token], """(ovar, token):
        from _contextvars import ContextVar
        if not isinstance(ovar, ContextVar):
            raise TypeError('an instance of ContextVar was expected')
        return ovar.reset(token)
        """)

@cpython_api([PyObject, PyObject, PyObjectP], rffi.INT_real, error=-1)
def PyContextVar_Get(space, w_ovar, default, val):
    if default:
        w_def = from_ref(space, default)
        w_ret = space.appexec([w_ovar, w_def], """(ovar, default):
            from _contextvars import ContextVar
            if not isinstance(ovar, ContextVar):
                raise TypeError('an instance of ContextVar was expected')
            return ovar.get(default)
        """)
    else:
        try:
            w_ret = space.appexec([w_ovar], """(ovar,):
                from _contextvars import ContextVar
                if not isinstance(ovar, ContextVar):
                    raise TypeError('an instance of ContextVar was expected')
                return ovar.get()
            """)
        except OperationError as e:
            if e.match(space, space.w_LookupError):
                val[0] = rffi.cast(PyObject, 0)
                return 0
            raise e
    val[0] = make_ref(space, w_ret)
    return 0