1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
|
DataFrames
==========
When handling large volumes of streaming tabular data it is often more
efficient to pass around larger Pandas dataframes with many rows each rather
than pass around individual Python tuples or dicts. Handling and computing on
data with Pandas can be much faster than operating on individual Python objects.
So one could imagine building streaming dataframe pipelines using the ``.map``
and ``.accumulate`` streaming operators with functions that consume and produce
Pandas dataframes as in the following example:
.. code-block:: python
from streamz import Stream
def query(df):
return df[df.name == 'Alice']
def aggregate(acc, df):
return acc + df.amount.sum()
stream = Stream()
stream.map(query).accumulate(aggregate, start=0)
This is fine, and straightforward to do if you understand ``streamz.core`` ,
Pandas, and have some skill with developing algorithms.
Streaming Dataframes
--------------------
The ``streamz.dataframe`` module provides a streaming dataframe object that
implements many of these algorithms for you. It provides a Pandas-like
interface on streaming data. Our example above is rewritten below using
streaming dataframes:
.. code-block:: python
import pandas as pd
from streamz.dataframe import DataFrame
example = pd.DataFrame({'name': [], 'amount': []})
sdf = DataFrame(stream, example=example)
sdf[sdf.name == 'Alice'].amount.sum()
The two examples are identical in terms of performance and execution. The
resulting streaming dataframe contains a ``.stream`` attribute which is
equivalent to the ``stream`` produced in the first example. Streaming
dataframes are only syntactic sugar on core streams.
Supported Operations
--------------------
Streaming dataframes support the following classes of operations
- Elementwise operations like ``df.x + 1``
- Filtering like ``df[df.name == 'Alice']``
- Column addition like ``df['z'] = df.x + df.y``
- Reductions like ``df.amount.mean()``
- Groupby-aggregations like ``df.groupby(df.name).amount.mean()``
- Windowed aggregations (fixed length) like ``df.window(n=100).amount.sum()``
- Windowed aggregations (index valued) like ``df.window(value='2h').amount.sum()``
- Windowed groupby aggregations like ``df.window(value='2h').groupby('name').amount.sum()``
DataFrame Aggregations
----------------------
Dataframe aggregations are composed of an aggregation (like sum, mean, ...) and
a windowing scheme (fixed sized windows, index-valued, all time, ...)
Aggregations
++++++++++++
Streaming Dataframe aggregations are built from three methods
- ``initial``: Creates initial state given an empty example dataframe
- ``on_new``: Updates state and produces new result to emit given new data
- ``on_old``: Updates state and produces new result to emit given decayed data
So a simple implementation of ``sum`` as an aggregation might look like the
following:
.. code-block:: python
from streamz.dataframe import Aggregation
class Mean(Aggregation):
def initial(self, new):
state = new.iloc[:0].sum(), new.iloc[:0].count()
return state
def on_new(self, state, new):
total, count = state
total = total + new.sum()
count = count + new.count()
new_state = (total, count)
new_value = total / count
return new_state, new_value
def on_old(self, state, old):
total, count = state
total = total - old.sum() # switch + for - here
count = count - old.count() # switch + for - here
new_state = (total, count)
new_value = total / count
return new_state, new_value
These aggregations can then used in a variety of different windowing schemes
with the ``aggregate`` method as follows:
.. code-block:: python
df.aggregate(Mean())
df.window(n=100).aggregate(Mean())
df.window(value='60s').aggregate(Mean())
whose job it is to deliver new and old data to your aggregation for processing.
Windowing Schemes
+++++++++++++++++
Different windowing schemes like fixed sized windows (last 100 elements) or
value-indexed windows (last two hours of data) will track newly arrived and
decaying data and call these methods accordingly. The mechanism to track data
arriving and leaving is kept orthogonal from the aggregations themselves.
These windowing schemes include the following:
1. All previous data. Only ``initial`` and ``on_new`` are called, ``on_old``
is never called.
.. code-block:: python
>>> df.sum()
2. The previous ``n`` elements
.. code-block:: python
>>> df.window(n=100).sum()
3. An index range, like a time range for a datetime index
.. code-block:: python
>>> df.window(value='2h').sum()
Although this can be done for any range on any type of index, time is just
a common case.
Windowing schemes generally maintain a deque of historical values within
accumulated state. As new data comes in they inspect that state and eject data
that no longer falls within the window.
Grouping
++++++++
Groupby aggregations also maintain historical data on the grouper and perform a
parallel aggregation on the number of times any key has been seen, removing
that key once it is no longer present.
Dask
----
In all cases, dataframe operations are only implemented with the ``.map`` and
``.accumulate`` operators, and so are equally compatible with core ``Stream``
and ``DaskStream`` objects.
Not Yet Supported
-----------------
Streaming dataframe algorithms do not currently pay special attention to data
arriving out-of-order.
PeriodicDataFrame
-----------------
As you have seen above, Streamz can handle arbitrarily complex pipelines,
events, and topologies, but what if you simply want to run some Python
function periodically and collect or plot the results?
streamz provides a high-level convenience class for this purpose, called
a PeriodicDataFrame. A PeriodicDataFrame uses Python's asyncio event loop
(used as part of Tornado in Jupyter and other interactive frameworks) to
call a user-provided function at a regular interval, collecting the results
and making them available for later processing.
In the simplest case, you can use a PeriodicDataFrame by first writing
a callback function like:
.. code-block:: python
import numpy as np
def random_datapoint(**kwargs):
return pd.DataFrame({'a': np.random.random(1)}, index=[pd.Timestamp.now()])
You can then make a streaming dataframe to poll this function
e.g. every 300 milliseconds:
.. code-block:: python
df = PeriodicDataFrame(random_datapoint, interval='300ms')
``df`` will now be a steady stream of whatever values are returned by
the `datafn`, which can of course be any Python code as long as it
returns a DataFrame.
Here we returned only a single point, appropriate for streaming the
results of system calls or other isolated actions, but any number of
entries can be returned by the dataframe in a single batch. To
facilitate collecting such batches, the callback is invoked with
keyword arguments ``last`` (the time of the previous invocation) and
``now`` (the time of the current invocation) as Pandas Timestamp
objects. The callback can then generate or query for just the values
in that time range.
Arbitrary keyword arguments can be provided to the PeriodicDataFrame
constructor, which will be passed into the callback so that its behavior
can be parameterized.
For instance, you can write a callback to return a suitable number of
datapoints to keep a regularly updating stream, generated randomly
as a batch since the last call:
.. code-block:: python
def datablock(last, now, **kwargs):
freq = kwargs.get("freq", pd.Timedelta("50ms"))
index = pd.date_range(start=last + freq, end=now, freq=freq)
return pd.DataFrame({'x': np.random.random(len(index))}, index=index)
df = PeriodicDataFrame(datablock, interval='300ms')
The callback will now be invoked every 300ms, each time generating
datapoints at a rate of 1 every 50ms, returned as a batch. If you
wished, you could override the 50ms value by passing
`freq=pd.Timedelta("100ms")` to the PeriodicDataFrame constructor.
Similar code could e.g. query an external database for the time range
since the last update, returning all datapoints since then.
Once you have a PeriodicDataFrame defined using such callbacks, you
can then use all the rest of the functionality supported by streamz,
including aggregations, rolling windows, etc., and streaming
`visualization. <plotting>`_
|