1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
import warnings
from functools import wraps, singledispatch
from typing import Mapping, Any, Sequence, Union
import h5py
import pandas as pd
import numpy as np
from scipy import sparse
from .logging import get_logger
from ._core.sparse_dataset import SparseDataset
logger = get_logger(__name__)
@singledispatch
def asarray(x):
"""Convert x to a numpy array"""
return np.asarray(x)
@asarray.register(sparse.spmatrix)
def asarray_sparse(x):
return x.toarray()
@asarray.register(SparseDataset)
def asarray_sparse_dataset(x):
return asarray(x.value)
@asarray.register(h5py.Dataset)
def asarray_h5py_dataset(x):
return x[...]
@singledispatch
def convert_to_dict(obj) -> dict:
return dict(obj)
@convert_to_dict.register(dict)
def convert_to_dict_dict(obj: dict):
return obj
@convert_to_dict.register(np.ndarray)
def convert_to_dict_ndarray(obj: np.ndarray):
if obj.dtype.fields is None:
raise TypeError(
"Can only convert np.ndarray with compound dtypes to dict, "
f"passed array had “{obj.dtype}”."
)
return {k: obj[k] for k in obj.dtype.fields.keys()}
@convert_to_dict.register(type(None))
def convert_to_dict_nonetype(obj: None):
return dict()
def make_index_unique(index: pd.Index, join: str = "-"):
"""
Makes the index unique by appending a number string to each duplicate index element:
'1', '2', etc.
If a tentative name created by the algorithm already exists in the index, it tries
the next integer in the sequence.
The first occurrence of a non-unique value is ignored.
Parameters
----------
join
The connecting string between name and integer.
Examples
--------
>>> from anndata import AnnData
>>> adata = AnnData(np.ones((2, 3)), var=pd.DataFrame(index=["a", "a", "b"]))
>>> adata.var_names
Index(['a', 'a', 'b'], dtype='object')
>>> adata.var_names_make_unique()
>>> adata.var_names
Index(['a', 'a-1', 'b'], dtype='object')
"""
if index.is_unique:
return index
from collections import Counter
values = index.values.copy()
indices_dup = index.duplicated(keep="first")
values_dup = values[indices_dup]
values_set = set(values)
counter = Counter()
issue_interpretation_warning = False
example_colliding_values = []
for i, v in enumerate(values_dup):
while True:
counter[v] += 1
tentative_new_name = v + join + str(counter[v])
if tentative_new_name not in values_set:
values_set.add(tentative_new_name)
values_dup[i] = tentative_new_name
break
issue_interpretation_warning = True
if len(example_colliding_values) < 5:
example_colliding_values.append(tentative_new_name)
if issue_interpretation_warning:
warnings.warn(
f"Suffix used ({join}[0-9]+) to deduplicate index values may make index "
+ "values difficult to interpret. There values with a similar suffixes in "
+ "the index. Consider using a different delimiter by passing "
+ "`join={delimiter}`"
+ "Example key collisions generated by the make_index_unique algorithm: "
+ str(example_colliding_values)
)
values[indices_dup] = values_dup
index = pd.Index(values, name=index.name)
return index
def warn_names_duplicates(attr: str):
names = "Observation" if attr == "obs" else "Variable"
logger.info(
f"{names} names are not unique. "
f"To make them unique, call `.{attr}_names_make_unique`."
)
def ensure_df_homogeneous(
df: pd.DataFrame, name: str
) -> Union[np.ndarray, sparse.csr_matrix]:
# TODO: rename this function, I would not expect this to return a non-dataframe
if all(isinstance(dt, pd.SparseDtype) for dt in df.dtypes):
arr = df.sparse.to_coo().tocsr()
else:
arr = df.to_numpy()
if df.dtypes.nunique() != 1:
warnings.warn(f"{name} converted to numpy array with dtype {arr.dtype}")
return arr
def convert_dictionary_to_structured_array(source: Mapping[str, Sequence[Any]]):
names = list(source.keys())
try: # transform to byte-strings
cols = [
np.asarray(col)
if np.array(col[0]).dtype.char not in {"U", "S"}
else np.asarray(col).astype("U")
for col in source.values()
]
except UnicodeEncodeError:
raise ValueError(
"Currently only support ascii strings. "
"Don’t use “ö” etc. for sample annotation."
)
# if old_index_key not in source:
# names.append(new_index_key)
# cols.append(np.arange(len(cols[0]) if cols else n_row).astype("U"))
# else:
# names[names.index(old_index_key)] = new_index_key
# cols[names.index(old_index_key)] = cols[names.index(old_index_key)].astype("U")
dtype_list = list(
zip(names, [str(c.dtype) for c in cols], [(c.shape[1],) for c in cols])
)
# might be unnecessary
dtype = np.dtype(dtype_list)
arr = np.zeros((len(cols[0]),), dtype)
# here, we do not want to call BoundStructArray.__getitem__
# but np.ndarray.__getitem__, therefore we avoid the following line
# arr = np.ndarray.__new__(cls, (len(cols[0]),), dtype)
for i, name in enumerate(dtype.names):
arr[name] = np.array(cols[i], dtype=dtype_list[i][1])
return arr
def deprecated(new_name: str):
"""\
This is a decorator which can be used to mark functions
as deprecated. It will result in a warning being emitted
when the function is used.
"""
def decorator(func):
@wraps(func)
def new_func(*args, **kwargs):
# turn off filter
warnings.simplefilter("always", DeprecationWarning)
warnings.warn(
f"Use {new_name} instead of {func.__name__}, "
f"{func.__name__} will be removed in the future.",
category=DeprecationWarning,
stacklevel=2,
)
warnings.simplefilter("default", DeprecationWarning) # reset filter
return func(*args, **kwargs)
setattr(new_func, "__deprecated", True)
return new_func
return decorator
class DeprecationMixinMeta(type):
"""\
Use this as superclass so deprecated methods and properties
do not appear in vars(MyClass)/dir(MyClass)
"""
def __dir__(cls):
def is_deprecated(attr):
if isinstance(attr, property):
attr = attr.fget
return getattr(attr, "__deprecated", False)
return [
item
for item in type.__dir__(cls)
if not is_deprecated(getattr(cls, item, None))
]
|