File: dask.rst

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (142 lines) | stat: -rw-r--r-- 3,792 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
Dask Integration
================

The ``streamz.dask`` module contains a Dask_-powered implementation of the
core Stream object.  This is a drop-in implementation, but uses Dask for
execution and so can scale to a multicore machine or a distributed cluster.


Quickstart
----------

.. currentmodule:: streamz

Installation
++++++++++++

First install dask and dask.distributed::

   conda install dask
   or
   pip install dask[complete] --upgrade

You may also want to install Bokeh for web diagnostics::

   conda install -c bokeh bokeh
   or
   pip install bokeh --upgrade

Start Local Dask Client
+++++++++++++++++++++++

Then start a local Dask cluster

.. code-block:: python

   from dask.distributed import Client
   client = Client()

This operates on local processes or threads.  If you have Bokeh installed
then this will also start a diagnostics web server at
http://localhost:8787/status which you may want to open to get a real-time view
of execution.

Sequential Execution
++++++++++++++++++++

.. autosummary::
   Stream.emit
   map
   sink

Before we build a parallel stream, let's build a sequential stream that maps a
simple function across data, and then prints those results.  We use the core
``Stream`` object.

.. code-block:: python

   from time import sleep

   def inc(x):
       sleep(1)  # simulate actual work
       return x + 1

   from streamz import Stream

   source = Stream()
   source.map(inc).sink(print)

   for i in range(10):
       source.emit(i)

This should take ten seconds because we call the ``inc`` function ten times
sequentially.

Parallel Execution
++++++++++++++++++

.. currentmodule:: streamz

.. autosummary::
   scatter
   buffer

.. currentmodule:: streamz.dask

.. autosummary::
   gather

That example ran sequentially under normal execution, now we use ``.scatter()``
to convert our stream into a DaskStream and ``.gather()`` to convert back.

.. code-block:: python

   source = Stream()
   source.scatter().map(inc).buffer(8).gather().sink(print)

   for i in range(10):
       source.emit(i)

You may want to look at http://localhost:8787/status during execution to get a
sense of the parallel execution.

This should have run much more quickly depending on how many cores you have on
your machine.  We added a few extra nodes to our stream; let's look at what they
did.

-   ``scatter``: Converted our Stream into a DaskStream.  The elements that we
    emitted into our source were sent to the Dask client, and the subsequent
    ``map`` call used that client's cores to perform the computations.
-   ``gather``: Converted our DaskStream back into a Stream, pulling data on our
    Dask client back to our local stream
-   ``buffer(5)``: Normally gather would exert back pressure so that the source
    would not accept new data until results finished and were pulled back to the
    local stream.  This back-pressure would limit parallelism.  To counter-act
    this we add a buffer of size eight to allow eight unfinished futures to
    build up in the pipeline before we start to apply back-pressure to
    ``source.emit``.

.. _Dask: https://dask.pydata.org/en/latest/


Gotchas
+++++++


An important gotcha with ``DaskStream`` is that it is a subclass of
``Stream``, and so can be used as an input to any function expecting a
``Stream``. If there is no intervening ``.gather()``, then the
downstream node will receive Dask futures instead of the data they
represent::

    source = Stream()
    source2 = Stream()
    a = source.scatter().map(inc)
    b = source2.combine_latest(a)

In this case, the combine operation will get real values from
``source2``, and Dask futures.  Downstream nodes would be free to
operate on the futures, but more likely, the line should be::

    b = source2.combine_latest(a.gather())