File: dataframe.py

package info (click to toggle)
python-fastparquet 2024.2.0-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 120,180 kB
  • sloc: python: 8,181; makefile: 187
file content (267 lines) | stat: -rw-r--r-- 11,010 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
import re
from collections import OrderedDict
from packaging.version import Version
import numpy as np
from pandas import (
    Categorical, DataFrame, Series,
    CategoricalIndex, RangeIndex, Index, MultiIndex,
    DatetimeIndex, CategoricalDtype,
    DatetimeTZDtype
)
from pandas.core.arrays.masked import BaseMaskedDtype
import warnings

from fastparquet.util import PANDAS_VERSION


class Dummy(object):
    pass


def empty(types, size, cats=None, cols=None, index_types=None, index_names=None,
          timezones=None, columns_dtype=None):
    """
    Create empty DataFrame to assign into

    In the simplest case, will return a Pandas dataframe of the given size,
    with columns of the given names and types. The second return value `views`
    is a dictionary of numpy arrays into which you can assign values that
    show up in the dataframe.

    For categorical columns, you get two views to assign into: if the
    column name is "col", you get both "col" (the category codes) and
    "col-catdef" (the category labels).

    For a single categorical index, you should use the `.set_categories`
    method of the appropriate "-catdef" columns, passing an Index of values

    ``views['index-catdef'].set_categories(pd.Index(newvalues), fastpath=True)``

    Multi-indexes work a lot like categoricals, even if the types of each
    index are not themselves categories, and will also have "-catdef" entries
    in the views. However, these will be Dummy instances, providing only a
    ``.set_categories`` method, to be used as above.

    Parameters
    ----------
    types: like np record structure, 'i4,u2,f4,f2,f4,M8,m8', or using tuples
        applies to non-categorical columns. If there are only categorical
        columns, an empty string of None will do.
    size: int
        Number of rows to allocate
    cats: dict {col: labels}
        Location and labels for categorical columns, e.g., {1: ['mary', 'mo]}
        will create column index 1 (inserted amongst the numerical columns)
        with two possible values. If labels is an integers, `{'col': 5}`,
        will generate temporary labels using range. If None, or column name
        is missing, will assume 16-bit integers (a reasonable default).
    cols: list of labels
        assigned column names, including categorical ones.
    index_types: list of str
        For one of more index columns, make them have this type. See general
        description, above, for caveats about multi-indexing. If None, the
        index will be the default RangeIndex.
    index_names: list of str
        Names of the index column(s), if using
    timezones: dict {col: timezone_str}
        for timestamp type columns, apply this timezone to the pandas series;
        the numpy view will be UTC.
    file_has_columns: bool, default False
        for files that are filtered but had columns before

    Returns
    -------
    - dataframe with correct shape and data-types
    - list of numpy views, in order, of the columns of the dataframe. Assign
        to this.
    """
    views = {}
    timezones = timezones or {}

    if isinstance(types, str):
        types = types.split(',')
    cols = cols if cols is not None else range(len(types))

    def cat(col):
        if cats is None or col not in cats:
            return RangeIndex(0, 2**14)
        elif isinstance(cats[col], int):
            return RangeIndex(0, cats[col])
        else:  # explicit labels list
            return cats[col]

    df = OrderedDict()
    for t, col in zip(types, cols):
        if str(t) == 'category':
            df[str(col)] = Categorical.from_codes([], categories=cat(col))
        elif isinstance(t, BaseMaskedDtype):
            # pandas masked types
            arr_type = t.construct_array_type()
            df[str(col)] = arr_type(
                values=np.empty(0, dtype=t.numpy_dtype),
                mask=np.empty(0, dtype=np.bool_),
                copy=False
            )
        else:
            if hasattr(t, 'base'):
                # funky pandas not-dtype
                t = t.base
            if ("M" in str(t) or "time" in str(t)) and "[" not in str(t):
                t = str(t) + "[ns]"
            d = np.empty(0, dtype=t)
            if d.dtype.kind == "M" and str(col) in timezones:
                try:
                    z = tz_to_dt_tz(timezones[str(col)])
                    d = Series(d).dt.tz_localize(z)
                except:
                    warnings.warn("Inferring time-zone from %s in column %s "
                                  "failed, using time-zone-agnostic"
                                  "" % (timezones[str(col)], col))
            df[str(col)] = d

    columns = Index(df.keys(), dtype=columns_dtype) if columns_dtype is not None else None
    df = DataFrame(df, columns=columns)
    if not index_types:
        index = RangeIndex(size)
    elif len(index_types) == 1:
        t, col = index_types[0], index_names[0]
        if col is None:
            raise ValueError('If using an index, must give an index name')
        if str(t) == 'category':
            # https://github.com/dask/fastparquet/issues/576#issuecomment-805579337
            temp = Categorical.from_codes([], categories=cat(col))
            vals = np.zeros(size, dtype=temp.codes.dtype)
            c = Categorical.from_codes(vals, dtype=temp.dtype)
            index = CategoricalIndex(c)

            views[col] = vals
            views[col+'-catdef'] = index._data
        else:
            if hasattr(t, 'base'):
                # funky pandas not-dtype
                 t = t.base
            # Initialize datetime index to zero: uninitialized data might fail
            # validation due to being an out-of-bounds datetime. xref
            # https://github.com/dask/fastparquet/issues/778
            dtype = np.dtype(t)
            d = np.zeros(size, dtype=dtype) if dtype.kind == "M" else np.empty(size, dtype=dtype)
            if d.dtype.kind == "M" and str(col) in timezones:
                # 1) create the DatetimeIndex in UTC as no datetime conversion is needed and
                # it works with d uninitialised data (no NonExistentTimeError or AmbiguousTimeError)
                # 2) convert to timezone (if UTC=noop, if None=remove tz, if other=change tz)
                index = DatetimeIndex(d, tz="UTC").tz_convert(
                    tz_to_dt_tz(timezones[str(col)]))
            else:
                index = Index(d)
            views[col] = index.values
    else:
        index = MultiIndex([[]], [[]])
        # index = MultiIndex.from_arrays(indexes)
        index._levels = list()
        index._labels = list()
        index._codes = list()
        index._names = list(index_names)
        for i, col in enumerate(index_names):
            index._levels.append(Index([None]))

            def set_cats(values, i=i, col=col, **kwargs):
                values.name = col
                if index._levels[i][0] is None:
                    index._levels[i] = values
                elif not index._levels[i].equals(values):
                    raise RuntimeError("Different dictionaries encountered"
                                       " while building categorical")

            x = Dummy()
            x._set_categories = set_cats
            x._multiindex = True

            d = np.zeros(size, dtype=int)
            if PANDAS_VERSION >= Version("0.24.0"):
                index._codes = list(index._codes) + [d]
            else:
                index._labels.append(d)
            views[col] = d
            views[col+'-catdef'] = x

    # Patch our blocks with desired-length arrays.  Kids: don't try this at home.
    mgr = df._mgr
    for block in mgr.blocks:
        bvalues = block.values
        shape = list(bvalues.shape)
        shape[-1] = size

        if isinstance(bvalues, Categorical):
            code = np.full(fill_value=-1, shape=shape, dtype=bvalues.codes.dtype)

            values = Categorical.from_codes(codes=code, dtype=bvalues.dtype)

        elif isinstance(bvalues.dtype, DatetimeTZDtype):
            dt = "M8[ns]" if PANDAS_VERSION.major < 2 else f'M8[{bvalues.dtype.unit}]'
            values = np.zeros(shape=shape, dtype=dt)
            values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
        else:
            if not isinstance(bvalues, np.ndarray):
                # e.g. DatetimeLikeBlock backed by DatetimeArray/TimedeltaArray
                if bvalues.dtype.kind == "m":
                    dt = "m8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype
                    values = np.zeros(shape=shape, dtype=dt)
                    values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
                elif bvalues.dtype.kind == "M":
                    dt = "M8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype
                    values = np.zeros(shape=shape, dtype=dt)
                    values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
                elif str(bvalues.dtype)[0] in {"I", "U"} or str(bvalues.dtype) == "boolean":
                    arr_type = bvalues.dtype.construct_array_type()
                    values = arr_type(
                        values=np.empty(size, dtype=bvalues.dtype.numpy_dtype),
                        mask=np.zeros(size, dtype=np.bool_)
                    )
                else:
                    raise NotImplementedError
            else:
                values = np.empty(shape=shape, dtype=bvalues.dtype)

        block.values = values

    mgr.axes[-1] = index

    # create views
    for block in df._mgr.blocks:
        dtype = block.dtype
        inds = block.mgr_locs.indexer
        if isinstance(inds, slice):
            inds = list(range(inds.start, inds.stop, inds.step))
        for i, ind in enumerate(inds):
            col = df.columns[ind]
            if isinstance(dtype, CategoricalDtype):
                views[col] = block.values._codes
                views[col+'-catdef'] = block.values
            elif getattr(block.dtype, 'tz', None):
                arr = np.asarray(block.values, dtype='M8[ns]')
                if len(arr.shape) > 1:
                    # pandas >= 1.3 does this for some reason
                    arr = arr.squeeze(axis=0)
                views[col] = arr
            elif str(dtype)[0] in {"I", "U"} or str(dtype) == "boolean":
                views[col] = block.values
            else:
                views[col] = block.values[i]

    if index_names:
        df.index.names = [
            None if re.match(r'__index_level_\d+__', n) else n
            for n in index_names
        ]
    return df, views


def tz_to_dt_tz(z):
    if ":" in z:
        import datetime
        hours, mins = z.split(":", 1)
        sign = z.startswith("-")
        z = int(hours) * 3600
        z += (1, -1)[sign] * int(mins) * 60
        z = datetime.timezone(datetime.timedelta(seconds=z))
    return z