1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
import collections.abc
from typing import Any, Dict, Hashable, Iterable, Mapping, Optional, Tuple, Union
import numpy as np
import pandas as pd
from . import formatting
from .utils import is_scalar
from .variable import Variable
def remove_unused_levels_categories(index: pd.Index) -> pd.Index:
"""
Remove unused levels from MultiIndex and unused categories from CategoricalIndex
"""
if isinstance(index, pd.MultiIndex):
index = index.remove_unused_levels()
# if it contains CategoricalIndex, we need to remove unused categories
# manually. See https://github.com/pandas-dev/pandas/issues/30846
if any(isinstance(lev, pd.CategoricalIndex) for lev in index.levels):
levels = []
for i, level in enumerate(index.levels):
if isinstance(level, pd.CategoricalIndex):
level = level[index.codes[i]].remove_unused_categories()
else:
level = level[index.codes[i]]
levels.append(level)
# TODO: calling from_array() reorders MultiIndex levels. It would
# be best to avoid this, if possible, e.g., by using
# MultiIndex.remove_unused_levels() (which does not reorder) on the
# part of the MultiIndex that is not categorical, or by fixing this
# upstream in pandas.
index = pd.MultiIndex.from_arrays(levels, names=index.names)
elif isinstance(index, pd.CategoricalIndex):
index = index.remove_unused_categories()
return index
class Indexes(collections.abc.Mapping):
"""Immutable proxy for Dataset or DataArrary indexes."""
__slots__ = ("_indexes",)
def __init__(self, indexes):
"""Not for public consumption.
Parameters
----------
indexes : Dict[Any, pandas.Index]
Indexes held by this object.
"""
self._indexes = indexes
def __iter__(self):
return iter(self._indexes)
def __len__(self):
return len(self._indexes)
def __contains__(self, key):
return key in self._indexes
def __getitem__(self, key):
return self._indexes[key]
def __repr__(self):
return formatting.indexes_repr(self)
def default_indexes(
coords: Mapping[Any, Variable], dims: Iterable
) -> Dict[Hashable, pd.Index]:
"""Default indexes for a Dataset/DataArray.
Parameters
----------
coords : Mapping[Any, xarray.Variable]
Coordinate variables from which to draw default indexes.
dims : iterable
Iterable of dimension names.
Returns
-------
Mapping from indexing keys (levels/dimension names) to indexes used for
indexing along that dimension.
"""
return {key: coords[key].to_index() for key in dims if key in coords}
def isel_variable_and_index(
name: Hashable,
variable: Variable,
index: pd.Index,
indexers: Mapping[Hashable, Union[int, slice, np.ndarray, Variable]],
) -> Tuple[Variable, Optional[pd.Index]]:
"""Index a Variable and pandas.Index together."""
if not indexers:
# nothing to index
return variable.copy(deep=False), index
if len(variable.dims) > 1:
raise NotImplementedError(
"indexing multi-dimensional variable with indexes is not supported yet"
)
new_variable = variable.isel(indexers)
if new_variable.dims != (name,):
# can't preserve a index if result has new dimensions
return new_variable, None
# we need to compute the new index
(dim,) = variable.dims
indexer = indexers[dim]
if isinstance(indexer, Variable):
indexer = indexer.data
new_index = index[indexer]
return new_variable, new_index
def roll_index(index: pd.Index, count: int, axis: int = 0) -> pd.Index:
"""Roll an pandas.Index."""
count %= index.shape[0]
if count != 0:
return index[-count:].append(index[:-count])
else:
return index[:]
def propagate_indexes(
indexes: Optional[Dict[Hashable, pd.Index]], exclude: Optional[Any] = None
) -> Optional[Dict[Hashable, pd.Index]]:
"""Creates new indexes dict from existing dict optionally excluding some dimensions."""
if exclude is None:
exclude = ()
if is_scalar(exclude):
exclude = (exclude,)
if indexes is not None:
new_indexes = {k: v for k, v in indexes.items() if k not in exclude}
else:
new_indexes = None # type: ignore
return new_indexes
|