1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
|
from copy import deepcopy
from functools import reduce
from typing import Union, Mapping, MutableMapping, Optional
from warnings import warn
import h5py
from scipy.sparse import spmatrix
import numpy as np
import pandas as pd
from ._overloaded_dict import _overloaded_uns, OverloadedDict
from .._core.index import _subset
# try importing zarr, dask, and zappy
from packaging import version
try:
from zarr.core import Array as ZarrArray
except ImportError:
class ZarrArray:
@staticmethod
def __repr__():
return "mock zarr.core.Array"
try:
from zappy.base import ZappyArray
except ImportError:
class ZappyArray:
@staticmethod
def __repr__():
return "mock zappy.base.ZappyArray"
try:
from dask.array import Array as DaskArray
except ImportError:
class DaskArray:
@staticmethod
def __repr__():
return "mock dask.array.core.Array"
try:
from typing import Literal
except ImportError:
try:
from typing_extensions import Literal
except ImportError:
class LiteralMeta(type):
def __getitem__(cls, values):
if not isinstance(values, tuple):
values = (values,)
return type("Literal_", (Literal,), dict(__args__=values))
class Literal(metaclass=LiteralMeta):
pass
def _from_fixed_length_strings(value):
"""\
Convert from fixed length strings to unicode.
For backwards compatability with older h5ad and zarr files.
"""
new_dtype = []
for dt in value.dtype.descr:
dt_list = list(dt)
dt_type = dt[1]
# could probably match better
is_annotated = isinstance(dt_type, tuple)
if is_annotated:
dt_type = dt_type[0]
# Fixing issue introduced with h5py v2.10.0, see:
# https://github.com/h5py/h5py/issues/1307
if issubclass(np.dtype(dt_type).type, np.string_):
dt_list[1] = f"U{int(dt_type[2:])}"
elif is_annotated or np.issubdtype(np.dtype(dt_type), np.str_):
dt_list[1] = "O" # Assumption that it’s a vlen str
new_dtype.append(tuple(dt_list))
return value.astype(new_dtype)
def _decode_structured_array(
arr: np.ndarray, dtype: Optional[np.dtype] = None, copy: bool = False
) -> np.ndarray:
"""
h5py 3.0 now reads all strings as bytes. There is a helper method which can convert these to strings,
but there isn't anything for fields of structured dtypes.
Params
------
arr
An array with structured dtype
dtype
dtype of the array. This is checked for h5py string data types.
Passing this is allowed for cases where array may have been processed by another function before hand.
"""
if copy:
arr = arr.copy()
if dtype is None:
dtype = arr.dtype
# codecs.decode is 2x slower than this lambda, go figure
decode = np.frompyfunc(lambda x: x.decode("utf-8"), 1, 1)
for k, (dt, _) in dtype.fields.items():
check = h5py.check_string_dtype(dt)
if check is not None and check.encoding == "utf-8":
decode(arr[k], out=arr[k])
return arr
def _to_fixed_length_strings(value: np.ndarray) -> np.ndarray:
"""\
Convert variable length strings to fixed length.
Currently a workaround for
https://github.com/zarr-developers/zarr-python/pull/422
"""
new_dtype = []
for dt_name, (dt_type, dt_offset) in value.dtype.fields.items():
if dt_type.kind == "O":
# Assuming the objects are str
size = max(len(x.encode()) for x in value.getfield("O", dt_offset))
new_dtype.append((dt_name, ("U", size)))
else:
new_dtype.append((dt_name, dt_type))
return value.astype(new_dtype)
#############################
# Dealing with uns
#############################
def _clean_uns(d: Mapping[str, MutableMapping[str, Union[pd.Series, str, int]]]):
"""
Compat function for when categorical keys were stored in uns.
This used to be buggy because when storing categorical columns in obs and var with
the same column name, only one `<colname>_categories` is retained.
"""
k_to_delete = set()
for cats_name, cats in d.get("uns", {}).items():
if not cats_name.endswith("_categories"):
continue
name = cats_name.replace("_categories", "")
# fix categories with a single category
if isinstance(cats, (str, int)):
cats = [cats]
for ann in ["obs", "var"]:
if name not in d[ann]:
continue
codes: np.ndarray = d[ann][name].values
# hack to maybe find the axis the categories were for
if not np.all(codes < len(cats)):
continue
d[ann][name] = pd.Categorical.from_codes(codes, cats)
k_to_delete.add(cats_name)
for cats_name in k_to_delete:
del d["uns"][cats_name]
def _move_adj_mtx(d):
"""
Read-time fix for moving adjacency matrices from uns to obsp
"""
n = d.get("uns", {}).get("neighbors", {})
obsp = d.setdefault("obsp", {})
for k in ("distances", "connectivities"):
if (
(k in n)
and isinstance(n[k], (spmatrix, np.ndarray))
and len(n[k].shape) == 2
):
warn(
f"Moving element from .uns['neighbors']['{k}'] to .obsp['{k}'].\n\n"
"This is where adjacency matrices should go now.",
FutureWarning,
)
obsp[k] = n.pop(k)
def _find_sparse_matrices(d: Mapping, n: int, keys: tuple, paths: list):
"""Find paths to sparse matrices with shape (n, n)."""
for k, v in d.items():
if isinstance(v, Mapping):
_find_sparse_matrices(v, n, (*keys, k), paths)
elif isinstance(v, spmatrix) and v.shape == (n, n):
paths.append((*keys, k))
return paths
def _slice_uns_sparse_matrices(uns: MutableMapping, oidx: "Index1d", orig_n_obs: int):
"""slice sparse spatrices of n_obs × n_obs in self.uns"""
if isinstance(oidx, slice) and len(range(*oidx.indices(orig_n_obs))) == orig_n_obs:
return uns # slice of entire dimension is a no-op
paths = _find_sparse_matrices(uns, orig_n_obs, (), [])
if not paths:
return uns
uns = deepcopy(uns)
for path in paths:
str_path = "".join(f"['{key}']" for key in path)
warn(
f"During AnnData slicing, found matrix at .uns{str_path} that happens"
f" to be dimensioned at n_obs×n_obs ({orig_n_obs}×{orig_n_obs}).\n\n"
"These matrices should now be stored in the .obsp attribute.\n"
"This slicing behavior will be removed in anndata 0.8.",
FutureWarning,
)
d = reduce(lambda d, k: d[k], path[:-1], uns)
d[path[-1]] = _subset(d[path[-1]], (oidx, oidx))
return uns
|