1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300
|
from __future__ import annotations
import inspect
import weakref
from collections import deque
from collections.abc import Callable, Iterable
from functools import lru_cache, partial, wraps
from types import MethodType
from typing import Any, TypeVar, get_args
from warnings import warn
import packaging.version
from async_lru import __version__ as async_lru_version
from async_lru import alru_cache
from url_matcher import Patterns
def callable_has_parameter(obj: Callable[..., Any], name: str) -> bool:
try:
sig = inspect.signature(obj)
except ValueError: # built-in, e.g. int
return False
else:
return name in sig.parameters
def get_fq_class_name(cls: type) -> str:
"""Return the fully qualified name for a type.
>>> from web_poet import Injectable
>>> get_fq_class_name(Injectable)
'web_poet.pages.Injectable'
>>> from decimal import Decimal
>>> get_fq_class_name(Decimal)
'decimal.Decimal'
"""
return f"{cls.__module__}.{cls.__qualname__}"
def _clspath(cls: type, forced: str | None = None) -> str:
if forced is not None:
return forced
return get_fq_class_name(cls)
def _create_deprecated_class(
name: str,
new_class: type,
clsdict: dict[str, Any] | None = None,
warn_once: bool = True,
old_class_path: str | None = None,
new_class_path: str | None = None,
subclass_warn_message: str = "{cls} inherits from deprecated class {old}, please inherit from {new}.",
instance_warn_message: str = "{cls} is deprecated, instantiate {new} instead.",
) -> type:
"""
Return a "deprecated" class that causes its subclasses to issue a warning.
Subclasses of ``new_class`` are considered subclasses of this class.
It also warns when the deprecated class is instantiated, but do not when
its subclasses are instantiated.
It can be used to rename a base class in a library. For example, if we
have
class OldName(SomeClass):
# ...
and we want to rename it to NewName, we can do the following::
class NewName(SomeClass):
# ...
OldName = _create_deprecated_class('OldName', NewName)
Then, if user class inherits from OldName, warning is issued. Also, if
some code uses ``issubclass(sub, OldName)`` or ``isinstance(sub(), OldName)``
checks they'll still return True if sub is a subclass of NewName instead of
OldName.
"""
class DeprecatedClass(new_class.__class__): # type: ignore[misc, name-defined]
deprecated_class = None
warned_on_subclass = False
def __new__(
metacls, name: str, bases: tuple[type, ...], clsdict_: dict[str, Any]
) -> type:
cls = super().__new__(metacls, name, bases, clsdict_)
if metacls.deprecated_class is None:
metacls.deprecated_class = cls
return cls
def __init__(cls, name: str, bases: tuple[type, ...], clsdict_: dict[str, Any]):
meta = cls.__class__
old = meta.deprecated_class
if old in bases and not (warn_once and meta.warned_on_subclass):
meta.warned_on_subclass = True
msg = subclass_warn_message.format(
cls=_clspath(cls),
old=_clspath(old, old_class_path),
new=_clspath(new_class, new_class_path),
)
if warn_once:
msg += " (warning only on first subclass, there may be others)"
warn(msg, DeprecationWarning, stacklevel=2)
super().__init__(name, bases, clsdict_)
# see https://www.python.org/dev/peps/pep-3119/#overloading-isinstance-and-issubclass
# and https://docs.python.org/reference/datamodel.html#customizing-instance-and-subclass-checks
# for implementation details
def __instancecheck__(cls, inst: Any) -> bool:
return any(cls.__subclasscheck__(c) for c in (type(inst), inst.__class__))
def __subclasscheck__(cls, sub: type) -> bool:
if cls is not DeprecatedClass.deprecated_class:
# we should do the magic only if second `issubclass` argument
# is the deprecated class itself - subclasses of the
# deprecated class should not use custom `__subclasscheck__`
# method.
return super().__subclasscheck__(sub)
if not inspect.isclass(sub):
raise TypeError("issubclass() arg 1 must be a class")
mro = getattr(sub, "__mro__", ())
return any(c in {cls, new_class} for c in mro)
def __call__(cls, *args: Any, **kwargs: Any) -> Any:
old = DeprecatedClass.deprecated_class
if cls is old:
msg = instance_warn_message.format(
cls=_clspath(cls, old_class_path),
new=_clspath(new_class, new_class_path),
)
warn(msg, DeprecationWarning, stacklevel=2)
return super().__call__(*args, **kwargs)
deprecated_cls = DeprecatedClass(name, (new_class,), clsdict or {})
try:
frm = inspect.stack()[1]
parent_module = inspect.getmodule(frm[0])
if parent_module is not None:
deprecated_cls.__module__ = parent_module.__name__
except Exception as e:
# Sometimes inspect.stack() fails (e.g. when the first import of
# deprecated class is in jinja2 template). __module__ attribute is not
# important enough to raise an exception as users may be unable
# to fix inspect.stack() errors.
warn(f"Error detecting parent module: {e!r}", stacklevel=1)
return deprecated_cls
CallableT = TypeVar("CallableT", bound=Callable)
def memoizemethod_noargs(method: CallableT) -> CallableT:
"""Decorator to cache the result of a method (without arguments) using a
weak reference to its object.
It is faster than :func:`cached_method`, and doesn't add new attributes
to the instance, but it doesn't work if objects are unhashable.
"""
cache: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary()
@wraps(method)
def new_method(self, *args, **kwargs):
if self not in cache:
cache[self] = method(self, *args, **kwargs)
return cache[self]
return new_method # type: ignore[return-value]
def cached_method(method: CallableT) -> CallableT:
"""A decorator to cache method or coroutine method results,
so that if it's called multiple times for the same instance,
computation is only done once.
The cache is unbound, but it's tied to the instance lifetime.
.. note::
:func:`cached_method` is needed because :func:`functools.lru_cache`
doesn't work well on methods: self is used as a cache key,
so a reference to an instance is kept in the cache, and this
prevents deallocation of instances.
This decorator adds a new private attribute to the instance named
``_cached_method_{decorated_method_name}``; make sure the class
doesn't define an attribute of the same name.
"""
cached_meth_name = f"_cached_method_{method.__name__}"
if inspect.iscoroutinefunction(method):
meth = _cached_method_async(method, cached_meth_name)
else:
meth = _cached_method_sync(method, cached_meth_name)
meth.cached_method_name = cached_meth_name
return meth
def _cached_method_sync(method, cached_method_name: str):
@wraps(method)
def inner(self, *args, **kwargs):
if not hasattr(self, cached_method_name):
# on a first call, create a lru_cache-wrapped method,
# and store it on the instance
bound_method = MethodType(method, self)
cached_meth = lru_cache(maxsize=None)(bound_method)
setattr(self, cached_method_name, cached_meth)
else:
cached_meth = getattr(self, cached_method_name)
return cached_meth(*args, **kwargs)
return inner
def _cached_method_async(method, cached_method_name: str):
@wraps(method)
async def inner(self, *args, **kwargs):
if not hasattr(self, cached_method_name):
# on a first call, create an alru_cache-wrapped method,
# and store it on the instance
bound_method = MethodType(method, self)
cached_meth = _alru_cache(maxsize=None)(bound_method)
setattr(self, cached_method_name, cached_meth)
else:
cached_meth = getattr(self, cached_method_name)
return await cached_meth(*args, **kwargs)
return inner
# async_lru >= 2.0.0 removed cache_exceptions argument, and changed
# its default value. `_alru_cache` is a compatibility function which works with
# all async_lru versions and uses the same approach for exception caching
# as async_lru >= 2.0.0.
_alru_cache: Callable = alru_cache
_async_lru_version = packaging.version.parse(async_lru_version)
if _async_lru_version.major < 2:
_alru_cache = partial(alru_cache, cache_exceptions=False)
def as_list(value: Any) -> list[Any]:
"""Normalizes the value input as a list.
>>> as_list(None)
[]
>>> as_list("foo")
['foo']
>>> as_list(123)
[123]
>>> as_list(["foo", "bar", 123])
['foo', 'bar', 123]
>>> as_list(("foo", "bar", 123))
['foo', 'bar', 123]
>>> as_list(range(5))
[0, 1, 2, 3, 4]
>>> def gen():
... yield 1
... yield 2
>>> as_list(gen())
[1, 2]
"""
if value is None:
return []
if isinstance(value, str):
return [value]
if not isinstance(value, Iterable):
return [value]
return list(value)
async def ensure_awaitable(obj):
"""Return the value of obj, awaiting it if needed"""
if inspect.isawaitable(obj):
return await obj
return obj
def str_to_pattern(url_pattern: str | Patterns) -> Patterns:
if isinstance(url_pattern, Patterns):
return url_pattern
return Patterns([url_pattern])
def get_generic_param(cls: type, expected: type | tuple[type, ...]) -> type | None:
"""Search the base classes recursively breadth-first for a generic class and return its param.
Returns the param of the first found class that is a subclass of ``expected``.
"""
visited = set()
queue = deque([cls])
while queue:
node = queue.popleft()
visited.add(node)
for base in getattr(node, "__orig_bases__", []):
origin = getattr(base, "__origin__", None)
if origin and issubclass(origin, expected):
result = get_args(base)[0]
if not isinstance(result, TypeVar):
return result
queue.append(base)
return None
|