File: plugins.py

package info (click to toggle)
python-xarray 2025.08.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 11,796 kB
  • sloc: python: 115,416; makefile: 258; sh: 47
file content (222 lines) | stat: -rw-r--r-- 8,304 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from __future__ import annotations

import functools
import inspect
import itertools
import warnings
from collections.abc import Callable
from importlib.metadata import entry_points
from typing import TYPE_CHECKING, Any

from xarray.backends.common import BACKEND_ENTRYPOINTS, BackendEntrypoint
from xarray.core.utils import module_available

if TYPE_CHECKING:
    import os
    from importlib.metadata import EntryPoint, EntryPoints

    from xarray.backends.common import AbstractDataStore
    from xarray.core.types import ReadBuffer

STANDARD_BACKENDS_ORDER = ["netcdf4", "h5netcdf", "scipy"]


def remove_duplicates(entrypoints: EntryPoints) -> list[EntryPoint]:
    # sort and group entrypoints by name
    entrypoints_sorted = sorted(entrypoints, key=lambda ep: ep.name)
    entrypoints_grouped = itertools.groupby(entrypoints_sorted, key=lambda ep: ep.name)
    # check if there are multiple entrypoints for the same name
    unique_entrypoints = []
    for name, _matches in entrypoints_grouped:
        # remove equal entrypoints
        matches = list(set(_matches))
        unique_entrypoints.append(matches[0])
        matches_len = len(matches)
        if matches_len > 1:
            all_module_names = [e.value.split(":")[0] for e in matches]
            selected_module_name = all_module_names[0]
            warnings.warn(
                f"Found {matches_len} entrypoints for the engine name {name}:"
                f"\n {all_module_names}.\n "
                f"The entrypoint {selected_module_name} will be used.",
                RuntimeWarning,
                stacklevel=2,
            )
    return unique_entrypoints


def detect_parameters(open_dataset: Callable) -> tuple[str, ...]:
    signature = inspect.signature(open_dataset)
    parameters = signature.parameters
    parameters_list = []
    for name, param in parameters.items():
        if param.kind in (
            inspect.Parameter.VAR_KEYWORD,
            inspect.Parameter.VAR_POSITIONAL,
        ):
            raise TypeError(
                f"All the parameters in {open_dataset!r} signature should be explicit. "
                "*args and **kwargs is not supported"
            )
        if name != "self":
            parameters_list.append(name)
    return tuple(parameters_list)


def backends_dict_from_pkg(
    entrypoints: list[EntryPoint],
) -> dict[str, type[BackendEntrypoint]]:
    backend_entrypoints = {}
    for entrypoint in entrypoints:
        name = entrypoint.name
        try:
            backend = entrypoint.load()
            backend_entrypoints[name] = backend
        except Exception as ex:
            warnings.warn(
                f"Engine {name!r} loading failed:\n{ex}", RuntimeWarning, stacklevel=2
            )
    return backend_entrypoints


def set_missing_parameters(
    backend_entrypoints: dict[str, type[BackendEntrypoint]],
) -> None:
    for backend in backend_entrypoints.values():
        if backend.open_dataset_parameters is None:
            open_dataset = backend.open_dataset
            backend.open_dataset_parameters = detect_parameters(open_dataset)


def sort_backends(
    backend_entrypoints: dict[str, type[BackendEntrypoint]],
) -> dict[str, type[BackendEntrypoint]]:
    ordered_backends_entrypoints = {}
    for be_name in STANDARD_BACKENDS_ORDER:
        if backend_entrypoints.get(be_name) is not None:
            ordered_backends_entrypoints[be_name] = backend_entrypoints.pop(be_name)
    ordered_backends_entrypoints.update(
        {name: backend_entrypoints[name] for name in sorted(backend_entrypoints)}
    )
    return ordered_backends_entrypoints


def build_engines(entrypoints: EntryPoints) -> dict[str, BackendEntrypoint]:
    backend_entrypoints: dict[str, type[BackendEntrypoint]] = {}
    for backend_name, (module_name, backend) in BACKEND_ENTRYPOINTS.items():
        if module_name is None or module_available(module_name):
            backend_entrypoints[backend_name] = backend
    entrypoints_unique = remove_duplicates(entrypoints)
    external_backend_entrypoints = backends_dict_from_pkg(entrypoints_unique)
    backend_entrypoints.update(external_backend_entrypoints)
    backend_entrypoints = sort_backends(backend_entrypoints)
    set_missing_parameters(backend_entrypoints)
    return {name: backend() for name, backend in backend_entrypoints.items()}


@functools.lru_cache(maxsize=1)
def list_engines() -> dict[str, BackendEntrypoint]:
    """
    Return a dictionary of available engines and their BackendEntrypoint objects.

    Returns
    -------
    dictionary

    Notes
    -----
    This function lives in the backends namespace (``engs=xr.backends.list_engines()``).
    If available, more information is available about each backend via ``engs["eng_name"]``.
    """
    entrypoints = entry_points(group="xarray.backends")
    return build_engines(entrypoints)


def refresh_engines() -> None:
    """Refreshes the backend engines based on installed packages."""
    list_engines.cache_clear()


def guess_engine(
    store_spec: str
    | os.PathLike[Any]
    | ReadBuffer
    | bytes
    | memoryview
    | AbstractDataStore,
) -> str | type[BackendEntrypoint]:
    engines = list_engines()

    for engine, backend in engines.items():
        try:
            if backend.guess_can_open(store_spec):
                return engine
        except PermissionError:
            raise
        except Exception:
            warnings.warn(
                f"{engine!r} fails while guessing", RuntimeWarning, stacklevel=2
            )

    compatible_engines = []
    for engine, (_, backend_cls) in BACKEND_ENTRYPOINTS.items():
        try:
            backend = backend_cls()
            if backend.guess_can_open(store_spec):
                compatible_engines.append(engine)
        except Exception:
            warnings.warn(
                f"{engine!r} fails while guessing", RuntimeWarning, stacklevel=2
            )

    installed_engines = [k for k in engines if k != "store"]
    if not compatible_engines:
        if installed_engines:
            error_msg = (
                "did not find a match in any of xarray's currently installed IO "
                f"backends {installed_engines}. Consider explicitly selecting one of the "
                "installed engines via the ``engine`` parameter, or installing "
                "additional IO dependencies, see:\n"
                "https://docs.xarray.dev/en/stable/getting-started-guide/installing.html\n"
                "https://docs.xarray.dev/en/stable/user-guide/io.html"
            )
        else:
            error_msg = (
                "xarray is unable to open this file because it has no currently "
                "installed IO backends. Xarray's read/write support requires "
                "installing optional IO dependencies, see:\n"
                "https://docs.xarray.dev/en/stable/getting-started-guide/installing.html\n"
                "https://docs.xarray.dev/en/stable/user-guide/io"
            )
    else:
        error_msg = (
            "found the following matches with the input file in xarray's IO "
            f"backends: {compatible_engines}. But their dependencies may not be installed, see:\n"
            "https://docs.xarray.dev/en/stable/user-guide/io.html \n"
            "https://docs.xarray.dev/en/stable/getting-started-guide/installing.html"
        )

    raise ValueError(error_msg)


def get_backend(engine: str | type[BackendEntrypoint]) -> BackendEntrypoint:
    """Select open_dataset method based on current engine."""
    if isinstance(engine, str):
        engines = list_engines()
        if engine not in engines:
            raise ValueError(
                f"unrecognized engine '{engine}' must be one of your download engines: {list(engines)}. "
                "To install additional dependencies, see:\n"
                "https://docs.xarray.dev/en/stable/user-guide/io.html \n"
                "https://docs.xarray.dev/en/stable/getting-started-guide/installing.html"
            )
        backend = engines[engine]
    elif issubclass(engine, BackendEntrypoint):
        backend = engine()
    else:
        raise TypeError(
            "engine must be a string or a subclass of "
            f"xarray.backends.BackendEntrypoint: {engine}"
        )

    return backend