File: _contextvars.py

package info (click to toggle)
pypy3 7.3.20%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 212,628 kB
  • sloc: python: 2,101,020; ansic: 540,684; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (201 lines) | stat: -rw-r--r-- 5,640 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from __pypy__ import get_contextvar_context, set_contextvar_context
from _immutables_map import Map
from _pypy_generic_alias import GenericAlias
try:
    from __pypy__ import hidden_applevel
except ImportError:
    hidden_applevel = lambda f: f
# implementation taken from PEP-0567 https://www.python.org/dev/peps/pep-0567/

_NO_DEFAULT = object()


class Unsubclassable(type):
    def __new__(cls, name, bases, dct):
        for base in bases:
            if isinstance(base, Unsubclassable):
                raise TypeError(f"type '{base.__name__}' is not an acceptable base type")
        return type.__new__(cls, name, bases, dict(dct))


def get_context():
    context = get_contextvar_context()
    if context is None:
        context = Context()
        set_contextvar_context(context)
    return context


class Context(metaclass=Unsubclassable):

    #_data: Map
    #_is_entered: bool

    def __init__(self):
        self._data = Map()
        self._is_entered = False

    @hidden_applevel
    def run(self, callable, *args, **kwargs):
        if self._is_entered:
            raise RuntimeError(
                f'cannot enter context: {self} is already entered')

        # don't use get_context() here, to avoid creating a Context object
        _prev_context = get_contextvar_context()
        try:
            self._is_entered = True
            set_contextvar_context(self)
            return callable(*args, **kwargs)
        finally:
            set_contextvar_context(_prev_context)
            self._is_entered = False

    def copy(self):
        new = Context()
        new._data = self._data
        return new

    # Implement abstract Mapping.__getitem__
    def __getitem__(self, var):
        if not isinstance(var, ContextVar):
            raise TypeError("ContextVar key was expected")
        return self._data[var]

    # Implement abstract Mapping.__contains__
    def __contains__(self, var):
        if not isinstance(var, ContextVar):
            raise TypeError("ContextVar key was expected")
        return var in self._data

    # Implement abstract Mapping.__len__
    def __len__(self):
        return len(self._data)

    # Implement abstract Mapping.__iter__
    def __iter__(self):
        return iter(self._data)

    def get(self, key, default=None):
        if not isinstance(key, ContextVar):
            raise TypeError("ContextVar key was expected")
        try:
            return self._data[key]
        except KeyError:
            return default

    def keys(self):
        from collections.abc import KeysView
        return KeysView(self)

    def values(self):
        from collections.abc import ValuesView
        return ValuesView(self)

    def items(self):
        from collections.abc import ItemsView
        return ItemsView(self)

    def __eq__(self, other):
        if not isinstance(other, Context):
            return NotImplemented
        return dict(self.items()) == dict(other.items())


def copy_context():
    return get_context().copy()

class ContextVar(metaclass=Unsubclassable):

    def __init__(self, name, *, default=_NO_DEFAULT):
        if not isinstance(name, str):
            raise TypeError("context variable name must be a str")
        self._name = name
        self._default = default

    @property
    def name(self):
        return self._name

    def get(self, default=_NO_DEFAULT):
        # don't use get_context() here, to avoid creating a Context object
        context = get_contextvar_context()
        if context is not None:
            try:
                return context[self]
            except KeyError:
                pass

        if default is not _NO_DEFAULT:
            return default

        if self._default is not _NO_DEFAULT:
            return self._default

        raise LookupError

    def set(self, value):
        context = get_context()

        data: Map = context._data
        try:
            old_value = data[self]
        except KeyError:
            old_value = Token.MISSING

        updated_data = data.set(self, value)
        context._data = updated_data
        return Token(context, self, old_value)

    def reset(self, token):
        if token._used:
            raise RuntimeError("Token has already been used once")

        if token._var is not self:
            raise ValueError(
                "Token was created by a different ContextVar")

        context = get_context()
        if token._context is not context:
            raise ValueError(
                "Token was created in a different Context")

        if token._old_value is Token.MISSING:
            context._data = context._data.delete(token._var)
        else:
            context._data = context._data.set(token._var, token._old_value)

        token._used = True

    def __class_getitem__(self, item):
        return GenericAlias(self, item)

    def __repr__(self):
        default = ''
        if self._default is not _NO_DEFAULT:
            default = f"default={self._default} "
        return f"<ContextVar name={self.name!r} {default}at 0x{id(self):x}>"


class Token(metaclass=Unsubclassable):
    MISSING = object()

    def __init__(self, context, var, old_value):
        self._context = context
        self._var = var
        self._old_value = old_value
        self._used = False

    @property
    def var(self):
        return self._var

    @property
    def old_value(self):
        return self._old_value

    def __repr__(self):
        return f"<Token {'used ' if self._used else ''}var={self._var} at 0x{id(self):x}>"

    def __class_getitem__(self, item):
        return GenericAlias(self, item)