File: async.rst

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (173 lines) | stat: -rw-r--r-- 5,533 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
Asynchronous Computation
========================

*This section is only relevant if you want to use time-based functionality.  If
you are only using operations like map and accumulate then you can safely skip
this section.*

When using time-based flow control like ``rate_limit``, ``delay``, or
``timed_window`` Streamz relies on the Tornado_ framework for concurrency.
This allows us to handle many concurrent operations cheaply and consistently
within a single thread.  However, this also adds complexity and requires some
understanding of asynchronous programming.  There are a few different ways to
use Streamz with a Tornado event loop.

We give a few examples below that all do the same thing, but with different
styles.  In each case we use the following toy functions:

.. code-block:: python

   from tornado import gen
   import time

   def increment(x):
       """ A blocking increment function

       Simulates a computational function that was not designed to work
       asynchronously
       """
       time.sleep(0.1)
       return x + 1

   @gen.coroutine
   def write(x):
       """ A non-blocking write function

       Simulates writing to a database asynchronously
       """
       yield gen.sleep(0.2)
       print(x)


Within the Event Loop
---------------------

You may have an application that runs strictly within an event loop.

.. code-block:: python

   from streamz import Stream
   from tornado.ioloop import IOLoop

   @gen.coroutine
   def f():
       source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
       source.map(increment).rate_limit(0.500).sink(write)

       for x in range(10):
           yield source.emit(x)

   IOLoop().run_sync(f)

We call Stream with the ``asynchronous=True`` keyword, informing it that it
should expect to operate within an event loop.  This ensures that calls to
``emit`` return Tornado futures rather than block.  We wait on results using
``yield``.

.. code-block:: python

   yield source.emit(x)  # waits until the pipeline is ready

This would also work with async-await syntax in Python 3

.. code-block:: python

   from streamz import Stream
   from tornado.ioloop import IOLoop

   async def f():
       source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
       source.map(increment).rate_limit(0.500).sink(write)

       for x in range(10):
           await source.emit(x)

   IOLoop().run_sync(f)


Event Loop on a Separate Thread
-------------------------------

Sometimes the event loop runs on a separate thread.  This is common when you
want to support interactive workloads (the user needs their own thread for
interaction) or when using Dask (next section).

.. code-block:: python

   from streamz import Stream

   source = Stream(asynchronous=False)  # starts IOLoop in separate thread
   source.map(increment).rate_limit('500ms').sink(write)

   for x in range(10):
       source.emit(x)

In this case we pass ``asynchronous=False`` to inform the stream that it is
expected to perform time-based computation (our write function is a coroutine)
but that it should not expect to run in an event loop, and so needs to start
its own in a separate thread.  Now when we call ``source.emit`` normally
without using ``yield`` or ``await`` the emit call blocks, waiting on a
coroutine to finish within the IOLoop.

All functions here happen on the IOLoop.  This is good for consistency, but can
cause other concurrent applications to become unresponsive if your functions
(like ``increment``) block for long periods of time.  You might address this by
using Dask (see below) which will offload these computations onto separate
threads or processes.


Using Dask
----------

Dask_ is a parallel computing library that uses Tornado for concurrency and
threads for computation.  The ``DaskStream`` object is a drop-in replacement
for ``Stream`` (mostly). Typically we create a Dask client, and then
``scatter`` a local Stream to become a DaskStream.

.. code-block:: python

   from dask.distributed import Client
   client = Client(processes=False)  # starts thread pool, IOLoop in separate thread

   from streamz import Stream
   source = Stream()
   (source.scatter()       # scatter local elements to cluster, creating a DaskStream
          .map(increment)  # map a function remotely
          .buffer(5)       # allow five futures to stay on the cluster at any time
          .gather()        # bring results back to local process
          .sink(write))    # call write locally

   for x in range(10):
       source.emit(x)

This operates very much like the synchronous case in terms of coding style (no
``@gen.coroutine`` or ``yield``) but does computations on separate threads.
This also provides parallelism and access to a dashboard at
http://localhost:8787/status .


Asynchronous Dask
-----------------

Dask can also operate within an event loop if preferred.  Here you can get the
non-blocking operation within an event loop while also offloading computations
to separate threads.

.. code-block:: python

   from dask.distributed import Client
   from tornado.ioloop import IOLoop

   async def f():
       client = await Client(processes=False, asynchronous=True)
       source = Stream(asynchronous=True)
       source.scatter().map(increment).rate_limit('500ms').gather().sink(write)

       for x in range(10):
           await source.emit(x)

   IOLoop().run_sync(f)


.. _Tornado: http://www.tornadoweb.org/en/stable/
.. _Dask: https://dask.pydata.org/en/latest/