1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
|
from typing import Union, Mapping, Sequence, Tuple
import h5py
import numpy as np
import pandas as pd
from scipy import sparse
from scipy.sparse import issparse
from . import anndata
from .index import _normalize_index, _subset, unpack_index, get_vector
from .aligned_mapping import AxisArrays, AxisArraysView
from .sparse_dataset import SparseDataset
# TODO: Implement views for Raw
class Raw:
def __init__(
self,
adata: "anndata.AnnData",
X: Union[np.ndarray, sparse.spmatrix, None] = None,
var: Union[pd.DataFrame, Mapping[str, Sequence], None] = None,
varm: Union[AxisArrays, Mapping[str, np.ndarray], None] = None,
):
from .anndata import _gen_dataframe
self._adata = adata
self._n_obs = adata.n_obs
# construct manually
if adata.isbacked == (X is None):
self._X = X
self._var = _gen_dataframe(var, self.X.shape[1], ["var_names"])
self._varm = AxisArrays(self, 1, varm)
elif X is None: # construct from adata
self._X = adata.X.copy()
self._var = adata.var.copy()
self._varm = AxisArrays(self, 1, adata.varm.copy())
elif adata.isbacked:
raise ValueError("Cannot specify X if adata is backed")
def _get_X(self, layer=None):
if layer is not None:
raise ValueError()
return self.X
@property
def X(self) -> Union[SparseDataset, np.ndarray, sparse.spmatrix]:
# TODO: Handle unsorted array of integer indices for h5py.Datasets
if not self._adata.isbacked:
return self._X
if not self._adata.file.is_open:
self._adata.file.open()
# Handle legacy file formats:
if "raw/X" in self._adata.file:
X = self._adata.file["raw/X"]
elif "raw.X" in self._adata.file:
X = self._adata.file["raw.X"] # Backwards compat
else:
raise AttributeError(
f"Could not find dataset for raw X in file: "
f"{self._adata.file.filename}."
)
if isinstance(X, h5py.Group):
X = SparseDataset(X)
# Check if we need to subset
if self._adata.is_view:
# TODO: As noted above, implement views of raw
# so we can know if we need to subset by var
return X[self._adata._oidx, slice(None)]
else:
return X
@property
def shape(self):
return self.n_obs, self.n_vars
@property
def var(self):
return self._var
@property
def n_vars(self):
return self._var.shape[0]
@property
def n_obs(self):
return self._n_obs
@property
def varm(self):
return self._varm
@property
def var_names(self):
return self.var.index
@property
def obs_names(self):
return self._adata.obs_names
def __getitem__(self, index):
oidx, vidx = self._normalize_indices(index)
# To preserve two dimensional shape
if isinstance(vidx, (int, np.integer)):
vidx = slice(vidx, vidx + 1, 1)
if isinstance(oidx, (int, np.integer)):
oidx = slice(oidx, oidx + 1, 1)
if not self._adata.isbacked:
X = _subset(self.X, (oidx, vidx))
else:
X = None
var = self._var.iloc[vidx]
new = Raw(self._adata, X=X, var=var)
if self._varm is not None:
# Since there is no view of raws
new._varm = self._varm._view(_RawViewHack(self, vidx), (vidx,)).copy()
return new
def __str__(self):
descr = f"Raw AnnData with n_obs × n_vars = {self.n_obs} × {self.n_vars}"
for attr in ["var", "varm"]:
keys = getattr(self, attr).keys()
if len(keys) > 0:
descr += f"\n {attr}: {str(list(keys))[1:-1]}"
return descr
def copy(self):
return Raw(
self._adata,
X=self._X.copy(),
var=self._var.copy(),
varm=None if self._varm is None else self._varm.copy(),
)
def to_adata(self):
"""Create full AnnData object."""
return anndata.AnnData(
X=self._X.copy(),
var=self._var.copy(),
varm=None if self._varm is None else self._varm.copy(),
obs=self._adata.obs.copy(),
obsm=self._adata.obsm.copy(),
uns=self._adata.uns.copy(),
)
def _normalize_indices(self, packed_index):
# deal with slicing with pd.Series
if isinstance(packed_index, pd.Series):
packed_index = packed_index.values
if isinstance(packed_index, tuple):
if len(packed_index) != 2:
raise IndexDimError(len(packed_index))
if isinstance(packed_index[1], pd.Series):
packed_index = packed_index[0], packed_index[1].values
if isinstance(packed_index[0], pd.Series):
packed_index = packed_index[0].values, packed_index[1]
obs, var = unpack_index(packed_index)
obs = _normalize_index(obs, self._adata.obs_names)
var = _normalize_index(var, self.var_names)
return obs, var
def var_vector(self, k: str) -> np.ndarray:
# TODO decorator to copy AnnData.var_vector docstring
return get_vector(self, k, "var", "obs")
def obs_vector(self, k: str) -> np.ndarray:
# TODO decorator to copy AnnData.obs_vector docstring
idx = self._normalize_indices((slice(None), k))
a = self.X[idx]
if issparse(a):
a = a.toarray()
return np.ravel(a)
# This exists to accommodate AlignedMappings,
# until we implement a proper RawView or get rid of Raw in favor of modes.
class _RawViewHack:
def __init__(self, raw: Raw, vidx: Union[slice, np.ndarray]):
self.parent_raw = raw
self.vidx = vidx
@property
def shape(self) -> Tuple[int, int]:
return self.parent_raw.n_obs, len(self.var_names)
@property
def obs_names(self) -> pd.Index:
return self.parent_raw.obs_names
@property
def var_names(self) -> pd.Index:
return self.parent_raw.var_names[self.vidx]
class IndexDimError(IndexError):
MSG = (
"You tried to slice an AnnData(View) object with an"
"{}-dimensional index, but only 2 dimensions exist in such an object."
)
MSG_1D = (
"\nIf you tried to slice cells using adata[cells, ], "
"note that Python (unlike R) uses adata[cells, :] as slicing syntax."
)
def __init__(self, n_dims: int):
msg = self.MSG.format(n_dims)
if n_dims == 1:
msg += self.MSG_1D
super().__init__(msg)
|