1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
Opportunistic Caching
=====================
Dask usually removes intermediate values as quickly as possible in order to
make space for more data to flow through your computation. However, in some
cases, we may want to hold onto intermediate values, because they might be
useful for future computations in an interactive session.
We need to balance the following concerns:
1. Intermediate results might be useful in future unknown computations
2. Intermediate results also fill up memory, reducing space for the rest of our
current computation
Negotiating between these two concerns helps us to leverage the memory that we
have available to speed up future, unanticipated computations. Which intermediate results
should we keep?
This document explains an experimental, opportunistic caching mechanism that automatically
picks out and stores useful tasks.
Motivating Example
------------------
Consider computing the maximum value of a column in a CSV file:
.. code-block:: python
>>> import dask.dataframe as dd
>>> df = dd.read_csv('myfile.csv')
>>> df.columns
['first-name', 'last-name', 'amount', 'id', 'timestamp']
>>> df.amount.max().compute()
1000
Even though our full dataset may be too large to fit in memory, the single
``df.amount`` column may be small enough to hold in memory just in case it
might be useful in the future. This is often the case during data exploration,
because we investigate the same subset of our data repeatedly before moving on.
For example, we may now want to find the minimum of the amount column:
.. code-block:: python
>>> df.amount.min().compute()
-1000
Under normal operations, this would need to read through the entire CSV file over
again. This is somewhat wasteful and stymies interactive data exploration.
Two Simple Solutions
--------------------
If we know ahead of time that we want both the maximum and minimum, we can
compute them simultaneously. Dask will share intermediates intelligently,
reading through the dataset only once:
.. code-block:: python
>>> dd.compute(df.amount.max(), df.amount.min())
(1000, -1000)
If we know that this column fits in memory, then we can also explicitly
compute the column and then continue forward with straight Pandas:
.. code-block:: python
>>> amount = df.amount.compute()
>>> amount.max()
1000
>>> amount.min()
-1000
If either of these solutions work for you, great. Otherwise, continue on for a third approach.
Automatic Opportunistic Caching
-------------------------------
Another approach is to watch *all* intermediate computations, and *guess* which
ones might be valuable to keep for the future. Dask has an *opportunistic
caching mechanism* that stores intermediate tasks that show the following
characteristics:
1. Expensive to compute
2. Cheap to store
3. Frequently used
We can activate a fixed sized cache as a callback_:
.. _callback: diagnostics-local.html#custom-callbacks
.. code-block:: python
>>> from dask.cache import Cache
>>> cache = Cache(2e9) # Leverage two gigabytes of memory
>>> cache.register() # Turn cache on globally
Now the cache will watch every small part of the computation and judge the
value of that part based on the three characteristics listed above (expensive
to compute, cheap to store, and frequently used).
Dask will hold on to 2GB of the
best intermediate results it can find, evicting older results as better results
come in. If the ``df.amount`` column fits in 2GB, then probably all of it will
be stored while we keep working on it.
If we start work on something else,
then the ``df.amount`` column will likely be evicted to make space for other
more timely results:
.. code-block:: python
>>> df.amount.max().compute() # slow the first time
1000
>>> df.amount.min().compute() # fast because df.amount is in the cache
-1000
>>> df.id.nunique().compute() # starts to push out df.amount from cache
Cache tasks, not expressions
----------------------------
This caching happens at the low-level scheduling layer, not the high-level
Dask DataFrame or Dask Array layer. We don't explicitly cache the column
``df.amount``. Instead, we cache the hundreds of small pieces of that column
that form the dask graph. It could be that we end up caching only a fraction
of the column.
This means that the opportunistic caching mechanism described above works for *all* Dask
computations, as long as those computations employ a consistent naming scheme
(as all of Dask DataFrame, Dask Array, and Dask Delayed do).
You can see which tasks are held by the cache by inspecting the following
attributes of the cache object:
.. code-block:: python
>>> cache.cache.data
<stored values>
>>> cache.cache.heap.heap
<scores of items in cache>
>>> cache.cache.nbytes
<number of bytes per item in cache>
The cache object is powered by cachey_, a tiny library for opportunistic
caching.
.. _cachey: https://github.com/blaze/cachey
Disclaimer
----------
Opportunistic caching is not available when using the distributed scheduler.
Restricting your cache to a fixed size like 2GB requires Dask to accurately count
the size of each of our objects in memory. This can be tricky, particularly
for Pythonic objects like lists and tuples, and for DataFrames that contain
object dtypes.
It is entirely possible that the caching mechanism will
*undercount* the size of objects, causing it to use up more memory than
anticipated, which can lead to blowing up RAM and crashing your session.
|