File: dataframe-groupby.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (231 lines) | stat: -rw-r--r-- 8,060 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
Shuffling Performance
=====================

.. currentmodule:: dask.dataframe

Operations like ``groupby``, ``join``, and ``set_index`` have special
performance considerations that are different from normal Pandas due to the
parallel, larger-than-memory, and distributed nature of Dask DataFrame.

Easy Case
---------

To start off, common groupby operations like
``df.groupby(columns).reduction()`` for known reductions like ``mean, sum, std,
var, count, nunique`` are all quite fast and efficient, even if partitions are
not cleanly divided with known divisions.  This is the common case.

Additionally, if divisions are known, then applying an arbitrary function to
groups is efficient when the grouping columns include the index.

Joins are also quite fast when joining a Dask DataFrame to a Pandas DataFrame
or when joining two Dask DataFrames along their index.  No special
considerations need to be made when operating in these common cases.

So, if you're doing common groupby and join operations, then you can stop reading
this.  Everything will scale nicely.  Fortunately, this is true most of the
time:

.. code-block:: python

   >>> ddf.groupby(columns).known_reduction()            # Fast and common case
   >>> ddf.groupby(columns_with_index).apply(user_fn)    # Fast and common case
   >>> ddf.join(pandas_df, on=column)                    # Fast and common case
   >>> lhs.join(rhs)                                     # Fast and common case
   >>> lhs.merge(rhs, on=columns_with_index)             # Fast and common case

Difficult Cases
---------------

In some cases, such as when applying an arbitrary function to groups (when not
grouping on index with known divisions), when joining along non-index columns,
or when explicitly setting an unsorted column to be the index, we may need to
trigger a full dataset shuffle:

.. code-block:: python

   >>> ddf.groupby(columns_no_index).apply(user_fn)   # Requires shuffle
   >>> lhs.join(rhs, on=columns_no_index)             # Requires shuffle
   >>> ddf.set_index(column)                          # Requires shuffle

A shuffle is necessary when we need to re-sort our data along a new index.  For
example, if we have banking records that are organized by time and we now want
to organize them by user ID, then we'll need to move a lot of data around.  In
Pandas all of this data fits in memory, so this operation was easy.  Now that we
don't assume that all data fits in memory, we must be a bit more careful.

Re-sorting the data can be avoided by restricting yourself to the easy cases
mentioned above.


.. _shuffle-methods:

Shuffle Methods
---------------

There are currently two strategies to shuffle data depending on whether you are
on a single machine or on a distributed cluster: shuffle on disk and shuffle
over the network.

Shuffle on Disk
```````````````

When operating on larger-than-memory data on a single machine, we shuffle by
dumping intermediate results to disk.  This is done using the partd_ project
for on-disk shuffles.

.. _partd: https://github.com/dask/partd

Shuffle over the Network
````````````````````````

When operating on a distributed cluster, the Dask workers may not have access to
a shared hard drive.  In this case, we shuffle data by breaking input partitions
into many pieces based on where they will end up and moving these pieces
throughout the network.

Selecting methods
`````````````````

Dask will use on-disk shuffling by default, but will switch to a
distributed shuffling algorithm if the default scheduler is set to use a
``dask.distributed.Client``, such as would be the case if the user sets the
Client as default:

.. code-block:: python

    client = Client('scheduler:8786', set_as_default=True)

Alternatively, if you prefer to avoid defaults, you can configure the global
shuffling method with the ``dataframe.shuffle.method`` configuration option.
This can be done globally:

.. code-block:: python

    dask.config.set({"dataframe.shuffle.method": "p2p"})

    ddf.groupby(...).apply(...)

or as a context manager:

.. code-block:: python

    with dask.config.set({"dataframe.shuffle.method": "p2p"}):
        ddf.groupby(...).apply(...)


In addition, ``set_index`` also accepts a ``shuffle_method`` keyword argument that
can be used to select either on-disk or task-based shuffling:

.. code-block:: python

    ddf.set_index(column, shuffle_method='disk')
    ddf.set_index(column, shuffle_method='tasks')
    ddf.set_index(column, shuffle_method='p2p')


.. _dataframe.groupby.aggregate:

Aggregate
=========

Dask supports Pandas' ``aggregate`` syntax to run multiple reductions on the
same groups.  Common reductions such as ``max``, ``sum``, ``list`` and ``mean`` are
directly supported:

.. code-block:: python

    >>> ddf.groupby(columns).aggregate(['sum', 'mean', 'max', 'min', list])

Dask also supports user defined reductions.  To ensure proper performance, the
reduction has to be formulated in terms of three independent steps. The
``chunk`` step is applied to each partition independently and reduces the data
within a partition. The ``aggregate`` combines the within partition results.
The optional ``finalize`` step combines the results returned from the
``aggregate`` step and should return a single final column. For Dask to
recognize the reduction, it has to be passed as an instance of
``dask.dataframe.Aggregation``.

For example, ``sum`` could be implemented as:

.. code-block:: python

    custom_sum = dd.Aggregation('custom_sum', lambda s: s.sum(), lambda s0: s0.sum())
    ddf.groupby('g').agg(custom_sum)

The name argument should be different from existing reductions to avoid data
corruption.  The arguments to each function are pre-grouped series objects,
similar to ``df.groupby('g')['value']``.

Many reductions can only be implemented with multiple temporaries. To implement
these reductions, the steps should return tuples and expect multiple arguments.
A mean function can be implemented as:

.. code-block:: python

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    ddf.groupby('g').agg(custom_mean)


For example, let's compute the group-wise extent (maximum - minimum)
for a DataFrame.

.. code-block:: python

   >>> df = pd.DataFrame({
   ...   'a': ['a', 'b', 'a', 'a', 'b'],
   ...   'b': [0, 1, 0, 2, 5],
   ... })
   >>> ddf = dd.from_pandas(df, 2)

We define the building blocks to find the maximum and minimum of each chunk, and then
the maximum and minimum over all the chunks. We finalize by taking the difference between
the Series with the maxima and minima

.. code-block:: python

   >>> def chunk(grouped):
   ...     return grouped.max(), grouped.min()

   >>> def agg(chunk_maxes, chunk_mins):
   ...     return chunk_maxes.max(), chunk_mins.min()

   >>> def finalize(maxima, minima):
   ...     return maxima - minima

Finally, we create and use the aggregation

.. code-block:: python

   >>> extent = dd.Aggregation('extent', chunk, agg, finalize=finalize)
   >>> ddf.groupby('a').agg(extent).compute()
      b
   a
   a  2
   b  4

To apply :py:class:`dask.dataframe.groupby.SeriesGroupBy.nunique` to more than one
column you can use:

.. code-block:: python

    >>> df['c'] = [1, 2, 1, 1, 2]
    >>> ddf = dd.from_pandas(df, 2)
    >>> nunique = dd.Aggregation(
    ...     name="nunique",
    ...     chunk=lambda s: s.apply(lambda x: list(set(x))),
    ...     agg=lambda s0: s0.obj.groupby(level=list(range(s0.obj.index.nlevels))).sum(),
    ...     finalize=lambda s1: s1.apply(lambda final: len(set(final))),
    ... )
    >>> ddf.groupby('a').agg({'b':nunique, 'c':nunique})

To access NumPy functions use ``apply`` with a lambda function such as ``.apply(lambda r: np.sum(r))``. Here's an example of how a sum of squares aggregation would look like:

.. code-block:: python

    >>> dd.Aggregation(name="sum_of_squares", chunk=lambda s: s.apply(lambda r: np.sum(np.power(r, 2))), agg=lambda s: s.sum())