File: raw.py

package info (click to toggle)
python-anndata 0.7.5%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 628 kB
  • sloc: python: 7,779; makefile: 8
file content (211 lines) | stat: -rw-r--r-- 6,787 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from typing import Union, Mapping, Sequence, Tuple

import h5py
import numpy as np
import pandas as pd
from scipy import sparse
from scipy.sparse import issparse

from . import anndata
from .index import _normalize_index, _subset, unpack_index, get_vector
from .aligned_mapping import AxisArrays, AxisArraysView
from .sparse_dataset import SparseDataset


# TODO: Implement views for Raw
class Raw:
    def __init__(
        self,
        adata: "anndata.AnnData",
        X: Union[np.ndarray, sparse.spmatrix, None] = None,
        var: Union[pd.DataFrame, Mapping[str, Sequence], None] = None,
        varm: Union[AxisArrays, Mapping[str, np.ndarray], None] = None,
    ):
        from .anndata import _gen_dataframe

        self._adata = adata
        self._n_obs = adata.n_obs
        # construct manually
        if adata.isbacked == (X is None):
            self._X = X
            self._var = _gen_dataframe(var, self.X.shape[1], ["var_names"])
            self._varm = AxisArrays(self, 1, varm)
        elif X is None:  # construct from adata
            self._X = adata.X.copy()
            self._var = adata.var.copy()
            self._varm = AxisArrays(self, 1, adata.varm.copy())
        elif adata.isbacked:
            raise ValueError("Cannot specify X if adata is backed")

    def _get_X(self, layer=None):
        if layer is not None:
            raise ValueError()
        return self.X

    @property
    def X(self) -> Union[SparseDataset, np.ndarray, sparse.spmatrix]:
        # TODO: Handle unsorted array of integer indices for h5py.Datasets
        if not self._adata.isbacked:
            return self._X
        if not self._adata.file.is_open:
            self._adata.file.open()
        # Handle legacy file formats:
        if "raw/X" in self._adata.file:
            X = self._adata.file["raw/X"]
        elif "raw.X" in self._adata.file:
            X = self._adata.file["raw.X"]  # Backwards compat
        else:
            raise AttributeError(
                f"Could not find dataset for raw X in file: "
                f"{self._adata.file.filename}."
            )
        if isinstance(X, h5py.Group):
            X = SparseDataset(X)
        # Check if we need to subset
        if self._adata.is_view:
            # TODO: As noted above, implement views of raw
            #       so we can know if we need to subset by var
            return X[self._adata._oidx, slice(None)]
        else:
            return X

    @property
    def shape(self):
        return self.n_obs, self.n_vars

    @property
    def var(self):
        return self._var

    @property
    def n_vars(self):
        return self._var.shape[0]

    @property
    def n_obs(self):
        return self._n_obs

    @property
    def varm(self):
        return self._varm

    @property
    def var_names(self):
        return self.var.index

    @property
    def obs_names(self):
        return self._adata.obs_names

    def __getitem__(self, index):
        oidx, vidx = self._normalize_indices(index)

        # To preserve two dimensional shape
        if isinstance(vidx, (int, np.integer)):
            vidx = slice(vidx, vidx + 1, 1)
        if isinstance(oidx, (int, np.integer)):
            oidx = slice(oidx, oidx + 1, 1)

        if not self._adata.isbacked:
            X = _subset(self.X, (oidx, vidx))
        else:
            X = None

        var = self._var.iloc[vidx]
        new = Raw(self._adata, X=X, var=var)
        if self._varm is not None:
            # Since there is no view of raws
            new._varm = self._varm._view(_RawViewHack(self, vidx), (vidx,)).copy()
        return new

    def __str__(self):
        descr = f"Raw AnnData with n_obs × n_vars = {self.n_obs} × {self.n_vars}"
        for attr in ["var", "varm"]:
            keys = getattr(self, attr).keys()
            if len(keys) > 0:
                descr += f"\n    {attr}: {str(list(keys))[1:-1]}"
        return descr

    def copy(self):
        return Raw(
            self._adata,
            X=self._X.copy(),
            var=self._var.copy(),
            varm=None if self._varm is None else self._varm.copy(),
        )

    def to_adata(self):
        """Create full AnnData object."""
        return anndata.AnnData(
            X=self._X.copy(),
            var=self._var.copy(),
            varm=None if self._varm is None else self._varm.copy(),
            obs=self._adata.obs.copy(),
            obsm=self._adata.obsm.copy(),
            uns=self._adata.uns.copy(),
        )

    def _normalize_indices(self, packed_index):
        # deal with slicing with pd.Series
        if isinstance(packed_index, pd.Series):
            packed_index = packed_index.values
        if isinstance(packed_index, tuple):
            if len(packed_index) != 2:
                raise IndexDimError(len(packed_index))
            if isinstance(packed_index[1], pd.Series):
                packed_index = packed_index[0], packed_index[1].values
            if isinstance(packed_index[0], pd.Series):
                packed_index = packed_index[0].values, packed_index[1]
        obs, var = unpack_index(packed_index)
        obs = _normalize_index(obs, self._adata.obs_names)
        var = _normalize_index(var, self.var_names)
        return obs, var

    def var_vector(self, k: str) -> np.ndarray:
        # TODO decorator to copy AnnData.var_vector docstring
        return get_vector(self, k, "var", "obs")

    def obs_vector(self, k: str) -> np.ndarray:
        # TODO decorator to copy AnnData.obs_vector docstring
        idx = self._normalize_indices((slice(None), k))
        a = self.X[idx]
        if issparse(a):
            a = a.toarray()
        return np.ravel(a)


# This exists to accommodate AlignedMappings,
# until we implement a proper RawView or get rid of Raw in favor of modes.
class _RawViewHack:
    def __init__(self, raw: Raw, vidx: Union[slice, np.ndarray]):
        self.parent_raw = raw
        self.vidx = vidx

    @property
    def shape(self) -> Tuple[int, int]:
        return self.parent_raw.n_obs, len(self.var_names)

    @property
    def obs_names(self) -> pd.Index:
        return self.parent_raw.obs_names

    @property
    def var_names(self) -> pd.Index:
        return self.parent_raw.var_names[self.vidx]


class IndexDimError(IndexError):
    MSG = (
        "You tried to slice an AnnData(View) object with an"
        "{}-dimensional index, but only 2 dimensions exist in such an object."
    )
    MSG_1D = (
        "\nIf you tried to slice cells using adata[cells, ], "
        "note that Python (unlike R) uses adata[cells, :] as slicing syntax."
    )

    def __init__(self, n_dims: int):
        msg = self.MSG.format(n_dims)
        if n_dims == 1:
            msg += self.MSG_1D
        super().__init__(msg)