File: dataframe-hive.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (246 lines) | stat: -rw-r--r-- 9,351 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
.. _dataframe.hive:

Using Hive Partitioning with Dask
=================================

.. currentmodule:: dask.dataframe

It is sometimes useful to write your dataset with a hive-like directory scheme.
For example, if your dataframe contains ``'year'`` and ``'semester'`` columns,
a hive-based directory structure might look something like the following::

    output-path/
    ├── year=2022/
    │   ├── semester=fall/
    │   │   └── part.0.parquet
    │   └── semester=spring/
    │       ├── part.0.parquet
    │       └── part.1.parquet
    └── year=2023/
        └── semester=fall/
            └── part.1.parquet

The use of this self-describing structure implies that all rows within
the ``'output-path/year=2022/semester=fall/'`` directory will contain
the value ``2022`` in the ``'year'`` column and the value ``'fall'``
in the ``'semester'`` column.

The primary advantage of generating a hive-partitioned dataset
is that certain IO filters can be applied by :func:`read_parquet`
without the need to parse any file metadata. In other words,
the following command will typically be faster when the dataset
is already hive-partitioned on the ``'year'`` column.

.. code-block:: python

    >>> dd.read_parquet("output-path", filters=[("year", ">", 2022)])

Writing Parquet Data with Hive Partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Dask's :func:`to_parquet` function will produce a hive-partitioned
directory scheme automatically when the ``partition_on`` option is used.

.. code-block:: python

    >>> df.to_parquet("output-path", partition_on=["year", "semester"])

    >>> os.listdir("output-path")
    ["year=2022", "year=2023"]

    >>> os.listdir("output-path/year=2022")
    ["semester=fall", "semester=spring"]

    >>> os.listdir("output-path/year=2022/semester=spring")
    ['part.0.parquet', 'part.1.parquet']


It is important to recognize that Dask will **not** aggregate the
data files written within each of the leaf directories. This is
because each of the DataFrame partitions is written independently 
during the execution of the :func:`to_parquet` task graph. In order 
to write out data for partition `i`, the partition-i write task will
perform a `groupby` operation on columns ``["year", "semester"]``,
and then each distinct group will be written to the corresponding
directory using the file name ``'part.{i}.parquet'``. Therefore, it
is possible for a hive-partitioned write to produce a large number
of files in every leaf directory (one for each DataFrame partition).

If your application requires you to produce a single parquet file
for each hive partition, one possible solution is to sort or shuffle
on the partitioning columns before calling :func:`to_parquet`.

.. code-block:: python

    >>> partition_on = ["year", "semester"]

    >>> df.shuffle(on=partition_on).to_parquet(partition_on=partition_on)

Using a global shuffle like this is extremely expensive, and should be
avoided whenever possible. However, it is also guaranteed to produce
the minimum number of files, which may be worth the sacrifice at times.


Reading Parquet Data with Hive Partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

In most cases, :func:`read_parquet` will process hive-partitioned
data automatically. By default, all hive-partitioned columns will
be interpreted as categorical columns.

.. code-block:: python

    >>> ddf = dd.read_parquet("output-path", columns=["year", "semester"])

    >>> ddf
    Dask DataFrame Structure:
                            year         semester
    npartitions=4                                  
                category[known]  category[known]
                            ...              ...
                            ...              ...
                            ...              ...
                            ...              ...
    Dask Name: read-parquet, 1 graph layer

    >>> ddf.compute()
    year semester
    0  2022     fall
    1  2022     fall
    2  2022     fall
    3  2022   spring
    4  2022   spring
    5  2022   spring
    6  2023     fall
    7  2023     fall


Defining a Custom Partitioning Schema
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

It is possible to specify a custom schema for the hive-partitioned columns. The columns
will then be read using the specified types and not as `category`.

.. code-block:: python

    >>> schema = pa.schema([("year", pa.int16()), ("semester", pa.string())])

    >>> ddf2 = dd.read_parquet(
    ...     path,
    ...     columns=["year", "semester"],
    ...     dataset={"partitioning": {"flavor": "hive", "schema": schema}}
    ... )
    Dask DataFrame Structure:
                    year semester
    npartitions=4                
                int16   object
                    ...      ...
                    ...      ...
                    ...      ...
                    ...      ...


If any of your hive-partitioned columns contain null values, you
**must** specify the partitioning schema in this way.

Although it is not required, we also recommend that you specify
the partitioning schema if you need to partition on high-cardinality
columns. This is because the default ``'category'`` dtype will
track the known categories in a way that can significantly increase
the overall memory footprint of your Dask collection. In fact,
:func:`read_parquet` already clears the "known categories" of other
columns for this same reason (see :doc:`dataframe-categoricals`).


Best Practices
--------------

Although hive partitioning can sometimes improve read performance
by simplifying filtering, it can also lead to degraded performance
and errors in other cases.


Avoid High Cardinality
~~~~~~~~~~~~~~~~~~~~~~

A good rule of thumb is to avoid partitioning on `float` columns,
or any column containing many unique values (i.e. high cardinality).

The most common cause of poor user experience with hive partitioning
is high-cardinality of the partitioning column(s). For example, if 
you try to partition on a column with millions of unique values, then
`:func:`to_parquet`` will need to generate millions of directories. The
management of these directories is likely to put strain on the file
system, and the need for many small files within each directory
is sure to compound the issue.


Use Simple Data Types for Partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Since hive-partitioned data is "self describing," we suggest that
you avoid partitioning on complex data types, and opt for integer
or string-based data types whenever possible. If your data type
cannot be easily inferred from the string value used to define the
directory name, then the IO engine may struggle to parse the values.

For example, directly partitioning on a column with a ``datetime64``
dtype might produce a directory name like the following::

    output-path/
    ├── date=2022-01-01 00:00:00/
    ├── date=2022-02-01 00:00:00/
    ├── ...
    └── date=2022-12-01 00:00:00/

These directory names will not be correctly interpreted as
``datetime64`` values, and are even considered illegal on Windows
systems. For more-reliable behavior, we recommend that such a column
be decomposed into one or more "simple" columns. For example, one
could easily use ``'date'`` to construct ``'year'``, ``'month'``,
and ``'day'`` columns (as needed).


Aggregate Files at Read Time
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. warning::
    The ``aggregate_files`` argument is currently listed as
    experimental. However, there are currently no plans to remove
    the argument or change it's behavior in a future release.

Since hive-partitioning will typically produce a large number of
small files, :func:`read_parquet` performance will usually benefit
from proper usage of the ``aggregate_files`` argument. Take the
following dataset for example::

    dataset-path/
    ├── region=1/
    │   ├── section=a/
    │   │   └── 01.parquet
    │   │   └── 02.parquet
    │   │   └── 03.parquet
    │   ├── section=b/
    │   └── └── 04.parquet
    │   └── └── 05.parquet
    └── region=2/
        ├── section=a/
        │   ├── 06.parquet
        │   ├── 07.parquet
        │   ├── 08.parquet

If we set ``aggregate_files=True`` for this case, we are telling Dask
that any of the parquet data files may be aggregated into the same output
DataFrame partition. If, instead, we specify the name of a partitioning
column (e.g. ``'region'`` or ``'section'``), we allow the aggregation of
any two files sharing a file path up to, and including, the corresponding
directory name. For example, if ``aggregate_files`` is set to ``'section'``,
``04.parquet`` and ``05.parquet`` may be aggregated together, but
``03.parquet`` and ``04.parquet`` cannot be. If, however, ``aggregate_files``
is set to ``'region'``, ``04.parquet`` may be aggregated with ``05.parquet``,
**and** ``03.parquet`` may be aggregated with ``04.parquet``.

Using ``aggregate_files`` will typically improve performance by making it
more likely for DataFrame partitions to approach the size specified by the
``blocksize`` argument. In contrast, default behavior may produce a large
number of partitions that are much smaller than ``blocksize``.