1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
from __pypy__ import get_contextvar_context, set_contextvar_context
from _immutables_map import Map
from _pypy_generic_alias import GenericAlias
try:
from __pypy__ import hidden_applevel
except ImportError:
hidden_applevel = lambda f: f
# implementation taken from PEP-0567 https://www.python.org/dev/peps/pep-0567/
_NO_DEFAULT = object()
class Unsubclassable(type):
def __new__(cls, name, bases, dct):
for base in bases:
if isinstance(base, Unsubclassable):
raise TypeError(f"type '{base.__name__}' is not an acceptable base type")
return type.__new__(cls, name, bases, dict(dct))
def get_context():
context = get_contextvar_context()
if context is None:
context = Context()
set_contextvar_context(context)
return context
class Context(metaclass=Unsubclassable):
#_data: Map
#_is_entered: bool
def __init__(self):
self._data = Map()
self._is_entered = False
@hidden_applevel
def run(self, callable, *args, **kwargs):
if self._is_entered:
raise RuntimeError(
f'cannot enter context: {self} is already entered')
# don't use get_context() here, to avoid creating a Context object
_prev_context = get_contextvar_context()
try:
self._is_entered = True
set_contextvar_context(self)
return callable(*args, **kwargs)
finally:
set_contextvar_context(_prev_context)
self._is_entered = False
def copy(self):
new = Context()
new._data = self._data
return new
# Implement abstract Mapping.__getitem__
def __getitem__(self, var):
if not isinstance(var, ContextVar):
raise TypeError("ContextVar key was expected")
return self._data[var]
# Implement abstract Mapping.__contains__
def __contains__(self, var):
if not isinstance(var, ContextVar):
raise TypeError("ContextVar key was expected")
return var in self._data
# Implement abstract Mapping.__len__
def __len__(self):
return len(self._data)
# Implement abstract Mapping.__iter__
def __iter__(self):
return iter(self._data)
def get(self, key, default=None):
if not isinstance(key, ContextVar):
raise TypeError("ContextVar key was expected")
try:
return self._data[key]
except KeyError:
return default
def keys(self):
from collections.abc import KeysView
return KeysView(self)
def values(self):
from collections.abc import ValuesView
return ValuesView(self)
def items(self):
from collections.abc import ItemsView
return ItemsView(self)
def __eq__(self, other):
if not isinstance(other, Context):
return NotImplemented
return dict(self.items()) == dict(other.items())
def copy_context():
return get_context().copy()
class ContextVar(metaclass=Unsubclassable):
def __init__(self, name, *, default=_NO_DEFAULT):
if not isinstance(name, str):
raise TypeError("context variable name must be a str")
self._name = name
self._default = default
@property
def name(self):
return self._name
def get(self, default=_NO_DEFAULT):
# don't use get_context() here, to avoid creating a Context object
context = get_contextvar_context()
if context is not None:
try:
return context[self]
except KeyError:
pass
if default is not _NO_DEFAULT:
return default
if self._default is not _NO_DEFAULT:
return self._default
raise LookupError
def set(self, value):
context = get_context()
data: Map = context._data
try:
old_value = data[self]
except KeyError:
old_value = Token.MISSING
updated_data = data.set(self, value)
context._data = updated_data
return Token(context, self, old_value)
def reset(self, token):
if token._used:
raise RuntimeError("Token has already been used once")
if token._var is not self:
raise ValueError(
"Token was created by a different ContextVar")
context = get_context()
if token._context is not context:
raise ValueError(
"Token was created in a different Context")
if token._old_value is Token.MISSING:
context._data = context._data.delete(token._var)
else:
context._data = context._data.set(token._var, token._old_value)
token._used = True
def __class_getitem__(self, item):
return GenericAlias(self, item)
def __repr__(self):
default = ''
if self._default is not _NO_DEFAULT:
default = f"default={self._default} "
return f"<ContextVar name={self.name!r} {default}at 0x{id(self):x}>"
class Token(metaclass=Unsubclassable):
MISSING = object()
def __init__(self, context, var, old_value):
self._context = context
self._var = var
self._old_value = old_value
self._used = False
@property
def var(self):
return self._var
@property
def old_value(self):
return self._old_value
def __repr__(self):
return f"<Token {'used ' if self._used else ''}var={self._var} at 0x{id(self):x}>"
def __class_getitem__(self, item):
return GenericAlias(self, item)
|