1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
# NOT_RPYTHON
"""
Plain Python definition of the builtin ABC-related functions.
"""
# Cannot use _weakrefset like in _py_abc.py, since it is not a built-in module
import _abc
from _weakref import ref
COLLECTION_FLAGS = (1 << 5) | (1 << 6) # Py_TPFLAGS_SEQUENCE | Py_TPFLAGS_MAPPING
# We'll make our own WeakSet instead, with only the functionality that's needed
# This replaces the easily-forgetable calls to '_add_to_weak_set' and
# '_in_weak_set' from the CPython implementation
class SimpleWeakSet:
def __init__(self, data=None):
self.data = set()
def _remove(item, selfref=ref(self)):
self = selfref()
if self is not None:
self.data.discard(item)
self._remove = _remove
def __iter__(self):
# Weakref callback may remove entry from set.
# So we make a copy first.
copy = list(self.data)
for itemref in copy:
item = itemref()
if item is not None:
yield item
def __contains__(self, item):
try:
wr = ref(item)
except TypeError:
return False
return wr in self.data
def add(self, item):
self.data.add(ref(item, self._remove))
def clear(self):
self.data.clear()
abc_invalidation_counter = 0
def get_cache_token():
"""Returns the current ABC cache token.
The token is an opaque object (supporting equality testing) identifying the
current version of the ABC cache for virtual subclasses. The token changes
with every call to ``register()`` on any ABC.
"""
return abc_invalidation_counter
def _abc_init(cls):
"""Internal ABC helper for class set-up. Should be never used outside abc module."""
namespace = cls.__dict__
abstracts = {name
for name, value in namespace.items()
if getattr(value, "__isabstractmethod__", False)}
bases = cls.__bases__
for base in bases:
for name in getattr(base, "__abstractmethods__", set()):
value = getattr(cls, name, None)
if getattr(value, "__isabstractmethod__", False):
abstracts.add(name)
cls.__abstractmethods__ = frozenset(abstracts)
# Set up inheritance registry
cls._abc_registry = SimpleWeakSet()
cls._abc_cache = SimpleWeakSet()
cls._abc_negative_cache = SimpleWeakSet()
cls._abc_negative_cache_version = abc_invalidation_counter
if isinstance(cls, type):
flags = cls.__dict__.get("__abc_tpflags__")
if flags is not None:
if flags & COLLECTION_FLAGS == COLLECTION_FLAGS:
raise TypeError("__abc_tpflags__ cannot be both Py_TPFLAGS_SEQUENCE and Py_TPFLAGS_MAPPING")
_abc._internal_set_collection_flag(cls, flags)
del cls.__abc_tpflags__
def _abc_register(cls, subclass):
"""Internal ABC helper for subclasss registration. Should be never used outside abc module."""
if not isinstance(subclass, type):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return subclass # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls._abc_registry.add(subclass)
global abc_invalidation_counter
abc_invalidation_counter += 1 # Invalidate negative cache
if isinstance(cls, type):
collection_flag = cls.__flags__ & COLLECTION_FLAGS
if collection_flag:
_abc._internal_set_collection_flag_recursive(subclass, collection_flag)
return subclass
def _abc_instancecheck(cls, instance):
"""Internal ABC helper for instance checks. Should be never used outside abc module."""
# Inline the cache checking
subclass = instance.__class__
if subclass in cls._abc_cache:
return True
subtype = type(instance)
if subtype is subclass:
if (cls._abc_negative_cache_version == abc_invalidation_counter and
subclass in cls._abc_negative_cache):
return False
# Fall back to the subclass check.
return cls.__subclasscheck__(subclass)
return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
def _abc_subclasscheck(cls, subclass):
"""Internal ABC helper for subclasss checks. Should be never used outside abc module."""
if not isinstance(subclass, type):
raise TypeError('issubclass() arg 1 must be a class')
# Check cache
if subclass in cls._abc_cache:
return True
# Check negative cache; may have to invalidate
if cls._abc_negative_cache_version < abc_invalidation_counter:
# Invalidate the negative cache
cls._abc_negative_cache = SimpleWeakSet()
cls._abc_negative_cache_version = abc_invalidation_counter
elif subclass in cls._abc_negative_cache:
return False
# Check the subclass hook
ok = cls.__subclasshook__(subclass)
if ok is not NotImplemented:
assert isinstance(ok, bool)
if ok:
cls._abc_cache.add(subclass)
else:
cls._abc_negative_cache.add(subclass)
return ok
# Check if it's a direct subclass
if cls in getattr(subclass, '__mro__', ()):
cls._abc_cache.add(subclass)
return True
# Check if it's a subclass of a registered class (recursive)
for rcls in cls._abc_registry:
if issubclass(subclass, rcls):
cls._abc_cache.add(subclass)
return True
# Check if it's a subclass of a subclass (recursive)
for scls in cls.__subclasses__():
if issubclass(subclass, scls):
cls._abc_cache.add(subclass)
return True
# No dice; update negative cache
cls._abc_negative_cache.add(subclass)
return False
def _get_dump(cls):
"""Internal ABC helper for cache and registry debugging
Return shallow copies of registry, of both caches, and
negative cache version. Don't call this function directly,
instead use ABC._dump_registry() for a nice repr."""
return (cls._abc_registry.data,
cls._abc_cache.data,
cls._abc_negative_cache.data,
cls._abc_negative_cache_version)
def _reset_registry(cls):
"""Internal ABC helper to reset registry of a given class.
Should be only used by refleak.py"""
cls._abc_registry.clear()
def _reset_caches(cls):
"""Internal ABC helper to reset both caches of a given class.
Should be only used by refleak.py"""
cls._abc_cache.clear()
cls._abc_negative_cache.clear()
|