1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
|
"""
Public testing utilities.
See also _lib._testing for additional private testing utilities.
"""
from __future__ import annotations
import contextlib
from collections.abc import Callable, Iterable, Iterator, Sequence
from functools import wraps
from types import ModuleType
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
from ._lib._utils._compat import is_dask_namespace, is_jax_namespace
__all__ = ["lazy_xp_function", "patch_lazy_xp_functions"]
if TYPE_CHECKING: # pragma: no cover
# TODO import override from typing (requires Python >=3.12)
import pytest
from dask.typing import Graph, Key, SchedulerGetCallable
from typing_extensions import override
else:
# Sphinx hacks
SchedulerGetCallable = object
def override(func: object) -> object:
return func
P = ParamSpec("P")
T = TypeVar("T")
_ufuncs_tags: dict[object, dict[str, Any]] = {} # type: ignore[explicit-any]
def lazy_xp_function( # type: ignore[explicit-any]
func: Callable[..., Any],
*,
allow_dask_compute: int = 0,
jax_jit: bool = True,
static_argnums: int | Sequence[int] | None = None,
static_argnames: str | Iterable[str] | None = None,
) -> None: # numpydoc ignore=GL07
"""
Tag a function to be tested on lazy backends.
Tag a function so that when any tests are executed with ``xp=jax.numpy`` the
function is replaced with a jitted version of itself, and when it is executed with
``xp=dask.array`` the function will raise if it attempts to materialize the graph.
This will be later expanded to provide test coverage for other lazy backends.
In order for the tag to be effective, the test or a fixture must call
:func:`patch_lazy_xp_functions`.
Parameters
----------
func : callable
Function to be tested.
allow_dask_compute : int, optional
Number of times `func` is allowed to internally materialize the Dask graph. This
is typically triggered by ``bool()``, ``float()``, or ``np.asarray()``.
Set to 1 if you are aware that `func` converts the input parameters to NumPy and
want to let it do so at least for the time being, knowing that it is going to be
extremely detrimental for performance.
If a test needs values higher than 1 to pass, it is a canary that the conversion
to NumPy/bool/float is happening multiple times, which translates to multiple
computations of the whole graph. Short of making the function fully lazy, you
should at least add explicit calls to ``np.asarray()`` early in the function.
*Note:* the counter of `allow_dask_compute` resets after each call to `func`, so
a test function that invokes `func` multiple times should still work with this
parameter set to 1.
Default: 0, meaning that `func` must be fully lazy and never materialize the
graph.
jax_jit : bool, optional
Set to True to replace `func` with ``jax.jit(func)`` after calling the
:func:`patch_lazy_xp_functions` test helper with ``xp=jax.numpy``. Set to False
if `func` is only compatible with eager (non-jitted) JAX. Default: True.
static_argnums : int | Sequence[int], optional
Passed to jax.jit. Positional arguments to treat as static (compile-time
constant). Default: infer from `static_argnames` using
`inspect.signature(func)`.
static_argnames : str | Iterable[str], optional
Passed to jax.jit. Named arguments to treat as static (compile-time constant).
Default: infer from `static_argnums` using `inspect.signature(func)`.
See Also
--------
patch_lazy_xp_functions : Companion function to call from the test or fixture.
jax.jit : JAX function to compile a function for performance.
Examples
--------
In ``test_mymodule.py``::
from array_api_extra.testing import lazy_xp_function from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
# When xp=jax.numpy, this is the same as `b = jax.jit(myfunc)(a)`
# When xp=dask.array, crash on compute() or persist()
b = myfunc(a)
Notes
-----
In order for this tag to be effective, the test function must be imported into the
test module globals without its namespace; alternatively its namespace must be
declared in a ``lazy_xp_modules`` list in the test module globals.
Example 1::
from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
x = myfunc(xp.asarray([1, 2]))
Example 2::
import mymodule
lazy_xp_modules = [mymodule]
lazy_xp_function(mymodule.myfunc)
def test_myfunc(xp):
x = mymodule.myfunc(xp.asarray([1, 2]))
A test function can circumvent this monkey-patching system by using a namespace
outside of the two above patterns. You need to sanitize your code to make sure this
only happens intentionally.
Example 1::
import mymodule
from mymodule import myfunc
lazy_xp_function(myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
b = myfunc(a) # This is wrapped when xp=jax.numpy or xp=dask.array
c = mymodule.myfunc(a) # This is not
Example 2::
import mymodule
class naked:
myfunc = mymodule.myfunc
lazy_xp_modules = [mymodule]
lazy_xp_function(mymodule.myfunc)
def test_myfunc(xp):
a = xp.asarray([1, 2])
b = mymodule.myfunc(a) # This is wrapped when xp=jax.numpy or xp=dask.array
c = naked.myfunc(a) # This is not
"""
tags = {
"allow_dask_compute": allow_dask_compute,
"jax_jit": jax_jit,
"static_argnums": static_argnums,
"static_argnames": static_argnames,
}
try:
func._lazy_xp_function = tags # type: ignore[attr-defined] # pylint: disable=protected-access # pyright: ignore[reportFunctionMemberAccess]
except AttributeError: # @cython.vectorize
_ufuncs_tags[func] = tags
def patch_lazy_xp_functions(
request: pytest.FixtureRequest, monkeypatch: pytest.MonkeyPatch, *, xp: ModuleType
) -> None:
"""
Test lazy execution of functions tagged with :func:`lazy_xp_function`.
If ``xp==jax.numpy``, search for all functions which have been tagged with
:func:`lazy_xp_function` in the globals of the module that defines the current test,
as well as in the ``lazy_xp_modules`` list in the globals of the same module,
and wrap them with :func:`jax.jit`. Unwrap them at the end of the test.
If ``xp==dask.array``, wrap the functions with a decorator that disables
``compute()`` and ``persist()`` and ensures that exceptions and warnings are raised
eagerly.
This function should be typically called by your library's `xp` fixture that runs
tests on multiple backends::
@pytest.fixture(params=[numpy, array_api_strict, jax.numpy, dask.array])
def xp(request, monkeypatch):
patch_lazy_xp_functions(request, monkeypatch, xp=request.param)
return request.param
but it can be otherwise be called by the test itself too.
Parameters
----------
request : pytest.FixtureRequest
Pytest fixture, as acquired by the test itself or by one of its fixtures.
monkeypatch : pytest.MonkeyPatch
Pytest fixture, as acquired by the test itself or by one of its fixtures.
xp : array_namespace
Array namespace to be tested.
See Also
--------
lazy_xp_function : Tag a function to be tested on lazy backends.
pytest.FixtureRequest : `request` test function parameter.
"""
mod = cast(ModuleType, request.module)
mods = [mod, *cast(list[ModuleType], getattr(mod, "lazy_xp_modules", []))]
def iter_tagged() -> ( # type: ignore[explicit-any]
Iterator[tuple[ModuleType, str, Callable[..., Any], dict[str, Any]]]
):
for mod in mods:
for name, func in mod.__dict__.items():
tags: dict[str, Any] | None = None # type: ignore[explicit-any]
with contextlib.suppress(AttributeError):
tags = func._lazy_xp_function # pylint: disable=protected-access
if tags is None:
with contextlib.suppress(KeyError, TypeError):
tags = _ufuncs_tags[func]
if tags is not None:
yield mod, name, func, tags
if is_dask_namespace(xp):
for mod, name, func, tags in iter_tagged():
n = tags["allow_dask_compute"]
wrapped = _dask_wrap(func, n)
monkeypatch.setattr(mod, name, wrapped)
elif is_jax_namespace(xp):
import jax
for mod, name, func, tags in iter_tagged():
if tags["jax_jit"]:
# suppress unused-ignore to run mypy in -e lint as well as -e dev
wrapped = cast( # type: ignore[explicit-any]
Callable[..., Any],
jax.jit(
func,
static_argnums=tags["static_argnums"],
static_argnames=tags["static_argnames"],
),
)
monkeypatch.setattr(mod, name, wrapped)
class CountingDaskScheduler(SchedulerGetCallable):
"""
Dask scheduler that counts how many times `dask.compute` is called.
If the number of times exceeds 'max_count', it raises an error.
This is a wrapper around Dask's own 'synchronous' scheduler.
Parameters
----------
max_count : int
Maximum number of allowed calls to `dask.compute`.
msg : str
Assertion to raise when the count exceeds `max_count`.
"""
count: int
max_count: int
msg: str
def __init__(self, max_count: int, msg: str): # numpydoc ignore=GL08
self.count = 0
self.max_count = max_count
self.msg = msg
@override
def __call__(self, dsk: Graph, keys: Sequence[Key] | Key, **kwargs: Any) -> Any: # type: ignore[decorated-any,explicit-any] # numpydoc ignore=GL08
import dask
self.count += 1
# This should yield a nice traceback to the
# offending line in the user's code
assert self.count <= self.max_count, self.msg
return dask.get(dsk, keys, **kwargs) # type: ignore[attr-defined,no-untyped-call] # pyright: ignore[reportPrivateImportUsage]
def _dask_wrap(
func: Callable[P, T], n: int
) -> Callable[P, T]: # numpydoc ignore=PR01,RT01
"""
Wrap `func` to raise if it attempts to call `dask.compute` more than `n` times.
After the function returns, materialize the graph in order to re-raise exceptions.
"""
import dask
func_name = getattr(func, "__name__", str(func))
n_str = f"only up to {n}" if n else "no"
msg = (
f"Called `dask.compute()` or `dask.persist()` {n + 1} times, "
f"but {n_str} calls are allowed. Set "
f"`lazy_xp_function({func_name}, allow_dask_compute={n + 1})` "
"to allow for more (but note that this will harm performance). "
)
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # numpydoc ignore=GL08
scheduler = CountingDaskScheduler(n, msg)
with dask.config.set({"scheduler": scheduler}): # pyright: ignore[reportPrivateImportUsage]
out = func(*args, **kwargs)
# Block until the graph materializes and reraise exceptions. This allows
# `pytest.raises` and `pytest.warns` to work as expected. Note that this would
# not work on scheduler='distributed', as it would not block.
return dask.persist(out, scheduler="threads")[0] # type: ignore[attr-defined,no-untyped-call,func-returns-value,index] # pyright: ignore[reportPrivateImportUsage]
return wrapper
|