File: dataframe-joins.rst

package info (click to toggle)
dask 2024.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 20,024 kB
  • sloc: python: 105,182; javascript: 1,917; makefile: 159; sh: 88
file content (84 lines) | stat: -rw-r--r-- 3,360 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
Joins
=====

DataFrame joins are a common and expensive computation that benefit from a
variety of optimizations in different situations.  Understanding how your data
is laid out and what you're trying to accomplish can have a large impact on
performance.  This documentation page goes through the various different
options and their performance impacts.

Large to Large Unsorted Joins
-----------------------------

In the worst case scenario you have two large tables with many partitions each
and you want to join them both along a column that may not be sorted.

This can be slow.  In this case Dask DataFrame will need to move all of your
data around so that rows with matching values in the joining columns are in the
same partition.  This large-scale movement can create communication costs, and
can require a large amount of memory.  If enough memory can not be found then
Dask will have to read and write data to disk, which may cause other
performance costs.

These problems are solvable, but will be significantly slower than many other
operations.  They are best avoided if possible.

Large to Small Joins
--------------------

Many join or merge computations combine a large table with one small one.  If
the small table is either a single partition Dask DataFrame or even just a
normal Pandas DataFrame then the computation can proceed in an embarrassingly
parallel way, where each partition of the large DataFrame is joined against the
single small table.  This incurs almost no overhead relative to Pandas joins.

If your smaller table can easily fit in memory, then you might want to ensure
that it is a single partition with the repartition method.

.. code-block:: python

    import dask
    large = dask.datasets.timeseries(freq="10s", npartitions=10)
    small = dask.datasets.timeseries(freq="1D", dtypes={"z": int})

    small = small.repartition(npartitions=1)
    result = large.merge(small, how="left", on=["timestamp"])

Sorted Joins
------------

The Pandas merge API supports the ``left_index=`` and ``right_index=`` options
to perform joins on the index.  For Dask DataFrames these keyword options hold
special significance if the index has known divisions
(see :ref:`dataframe-design-partitions`).
In this case the DataFrame partitions are aligned along these divisions (which
is generally fast) and then an embarrassingly parallel Pandas join happens
across partition pairs.  This is generally relatively fast.

Sorted or indexed joins are a good solution to the large-large join problem.
If you plan to join against a dataset repeatedly then it may be worthwhile to
set the index ahead of time, and possibly store the data in a format that
maintains that index, like Parquet.

.. code-block:: python

    import dask
    import dask.dataframe as dd

    left = dask.datasets.timeseries(dtypes={"foo": int})

    # timeseries returns a dataframe indexed by
    # timestamp, we don't need to set_index.

    # left.set_index("timestamp")

    left.to_parquet("left", overwrite=True)
    left = dd.read_parquet("left")

    right_one = dask.datasets.timeseries(dtypes={"bar": int})
    right_two = dask.datasets.timeseries(dtypes={"baz": int})

    result = left.merge(
        right_one, how="left", left_index=True, right_index=True)
    result = result.merge(
        right_two, how="left", left_index=True, right_index=True)