File: collections.rst

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (101 lines) | stat: -rw-r--r-- 3,140 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
Collections
===========

Streamz high-level collection APIs are built on top of ``streamz.core``, and
bring special consideration to certain types of data:

1.  ``streamz.batch``: supports streams of lists of Python objects like tuples
    or dictionaries
2.  ``streamz.dataframe``: supports streams of Pandas/cudf dataframes or Pandas/cudf series.
    cudf support is in beta phase and has limited functionality as of cudf version ``0.8``

These high-level APIs help us handle common situations in data processing.
They help us implement complex algorithms and also improve efficiency.

These APIs are built on the streamz core operations (map, accumulate, buffer,
timed_window, ...) which provide the building blocks to build complex pipelines
but offer no help with what those functions should be.  The higher-level APIs
help to fill in this gap for common situations.


Conversion
----------

.. currentmodule:: streamz.core

.. autosummary::
   Stream.to_batch
   Stream.to_dataframe

You can convert from core Stream objects to Batch, and
DataFrame objects using the ``.to_batch`` and ``.to_dataframe``
methods.  In each case we assume that the stream is a stream of batches (lists
or tuples) or a list of Pandas dataframes.

.. code-block:: python

   >>> batch = stream.to_batch()
   >>> sdf = stream.to_dataframe()


To convert back from a Batch or a DataFrame to a
``core.Stream`` you can access the ``.stream`` property.

.. code-block:: python

   >>> stream = sdf.stream
   >>> stream = batch.stream

Example
-------

We create a stream and connect it to a file object

.. code-block:: python

    file = ...  # filename or file-like object
    from streamz import Stream

    source = Stream.from_textfile(file)

Our file produces line-delimited JSON serialized data on which we want to call
``json.loads`` to parse into dictionaries.

To reduce overhead we first batch our records up into 100-line batches and turn
this into a Batch object.  We provide our Batch object an
example element that it will use to help it determine metadata.

.. code-block:: python

    example = [{'name': 'Alice', 'x': 1, 'y': 2}]
    lines = source.partition(100).to_batch(example=example)  # batches of 100 elements
    records = lines.map(json.loads)  # convert lines to text.

We could have done the ``.map(json.loads)`` command on the original stream, but
this way reduce overhead by applying this function to lists of items, rather
than one item at a time.

Now we convert these batches of records into pandas dataframes and do some
basic filtering and groupby-aggregations.

.. code-block:: python

   sdf = records.to_dataframe()
   sdf = sdf[sdf.name == "Alice"]
   sdf = sdf.groupby(sdf.x).y.mean()

The DataFrames satisfy a subset of the Pandas API, but now rather than
operate on the data directly, they set up a pipeline to compute the data in an
online fashion.

Finally we convert this back to a stream and push the results into a fixed-size
deque.

.. code-block:: python

   from collections import deque
   d = deque(maxlen=10)

   sdf.stream.sink(d.append)

See :doc:`Collections API <collections-api>` for more information.