1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
import collections.abc as cabc
from functools import singledispatch
from itertools import repeat
from typing import Union, Sequence, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.sparse import spmatrix, issparse
Index1D = Union[slice, int, str, np.int64, np.ndarray]
Index = Union[Index1D, Tuple[Index1D, Index1D], spmatrix]
def _normalize_indices(
index: Optional[Index], names0: pd.Index, names1: pd.Index
) -> Tuple[slice, slice]:
# deal with tuples of length 1
if isinstance(index, tuple) and len(index) == 1:
index = index[0]
# deal with pd.Series
if isinstance(index, pd.Series):
index: Index = index.values
if isinstance(index, tuple):
if len(index) > 2:
raise ValueError("AnnData can only be sliced in rows and columns.")
# deal with pd.Series
# TODO: The series should probably be aligned first
if isinstance(index[1], pd.Series):
index = index[0], index[1].values
if isinstance(index[0], pd.Series):
index = index[0].values, index[1]
ax0, ax1 = unpack_index(index)
ax0 = _normalize_index(ax0, names0)
ax1 = _normalize_index(ax1, names1)
return ax0, ax1
def _normalize_index(
indexer: Union[
slice,
np.integer,
int,
str,
Sequence[Union[int, np.integer]],
np.ndarray,
pd.Index,
],
index: pd.Index,
) -> Union[slice, int, np.ndarray]: # ndarray of int
if not isinstance(index, pd.RangeIndex):
assert (
index.dtype != float and index.dtype != int
), "Don’t call _normalize_index with non-categorical/string names"
# the following is insanely slow for sequences,
# we replaced it using pandas below
def name_idx(i):
if isinstance(i, str):
i = index.get_loc(i)
return i
if isinstance(indexer, slice):
start = name_idx(indexer.start)
stop = name_idx(indexer.stop)
# string slices can only be inclusive, so +1 in that case
if isinstance(indexer.stop, str):
stop = None if stop is None else stop + 1
step = indexer.step
return slice(start, stop, step)
elif isinstance(indexer, (np.integer, int)):
return indexer
elif isinstance(indexer, str):
return index.get_loc(indexer) # int
elif isinstance(indexer, (Sequence, np.ndarray, pd.Index, spmatrix, np.matrix)):
if hasattr(indexer, "shape") and (
(indexer.shape == (index.shape[0], 1))
or (indexer.shape == (1, index.shape[0]))
):
if isinstance(indexer, spmatrix):
indexer = indexer.toarray()
indexer = np.ravel(indexer)
if not isinstance(indexer, (np.ndarray, pd.Index)):
indexer = np.array(indexer)
if issubclass(indexer.dtype.type, (np.integer, np.floating)):
return indexer # Might not work for range indexes
elif issubclass(indexer.dtype.type, np.bool_):
if indexer.shape != index.shape:
raise IndexError(
f"Boolean index does not match AnnData’s shape along this "
f"dimension. Boolean index has shape {indexer.shape} while "
f"AnnData index has shape {index.shape}."
)
positions = np.where(indexer)[0]
return positions # np.ndarray[int]
else: # indexer should be string array
positions = index.get_indexer(indexer)
if np.any(positions < 0):
not_found = indexer[positions < 0]
raise KeyError(
f"Values {list(not_found)}, from {list(indexer)}, "
"are not valid obs/ var names or indices."
)
return positions # np.ndarray[int]
else:
raise IndexError(f"Unknown indexer {indexer!r} of type {type(indexer)}")
def unpack_index(index: Index) -> Tuple[Index1D, Index1D]:
if not isinstance(index, tuple):
return index, slice(None)
elif len(index) == 2:
return index
elif len(index) == 1:
return index[0], slice(None)
else:
raise IndexError("invalid number of indices")
@singledispatch
def _subset(a: Union[np.ndarray, pd.DataFrame], subset_idx: Index):
# Select as combination of indexes, not coordinates
# Correcting for indexing behaviour of np.ndarray
if all(isinstance(x, cabc.Iterable) for x in subset_idx):
subset_idx = np.ix_(*subset_idx)
return a[subset_idx]
@_subset.register(spmatrix)
def _subset_spmatrix(a: spmatrix, subset_idx: Index):
# Correcting for indexing behaviour of sparse.spmatrix
if len(subset_idx) > 1 and all(isinstance(x, cabc.Iterable) for x in subset_idx):
subset_idx = (subset_idx[0].reshape(-1, 1), *subset_idx[1:])
return a[subset_idx]
@_subset.register(pd.DataFrame)
def _subset_df(df: pd.DataFrame, subset_idx: Index):
return df.iloc[subset_idx]
def make_slice(idx, dimidx, n=2):
mut = list(repeat(slice(None), n))
mut[dimidx] = idx
return tuple(mut)
def get_vector(adata, k, coldim, idxdim, layer=None):
# adata could be self if Raw and AnnData shared a parent
dims = ("obs", "var")
col = getattr(adata, coldim).columns
idx = getattr(adata, f"{idxdim}_names")
in_col = k in col
in_idx = k in idx
if (in_col + in_idx) == 2:
raise ValueError(
f"Key {k} could be found in both .{idxdim}_names and .{coldim}.columns"
)
elif (in_col + in_idx) == 0:
raise KeyError(
f"Could not find key {k} in .{idxdim}_names or .{coldim}.columns."
)
elif in_col:
return getattr(adata, coldim)[k].values
elif in_idx:
selected_dim = dims.index(idxdim)
idx = adata._normalize_indices(make_slice(k, selected_dim))
a = adata._get_X(layer=layer)[idx]
if issparse(a):
a = a.toarray()
return np.ravel(a)
|