File: plugins.rst

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (102 lines) | stat: -rw-r--r-- 3,179 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
Plugins
=======

In addition to using ``@Stream.register_api()`` decorator, custom stream nodes can
be added to Streamz by installing 3rd-party Python packages.


Known plugins
-------------

Extras
++++++

These plugins are supported by the Streamz community and can be installed as extras,
e.g. ``pip install streamz[kafka]``.

There are no plugins here yet, but hopefully soon there will be.

.. only:: comment
    ================= ======================================================
    Extra name        Description
    ================= ======================================================
    ``files``         Advanced filesystem operations: listening for new
                    files in a directory, writing to multiple files etc.
    ``kafka``         Reading from and writing to Kafka topics.
    ================= ======================================================


Entry points
------------

Plugins register themselves with Streamz by using ``entry_points`` argument
in ``setup.py``:

.. code-block:: Python

    # setup.py

    from setuptools import setup

    setup(
        name="streamz_example_plugin",
        version="0.0.1",
        entry_points={
            "streamz.nodes": [
                "repeat = streamz_example_plugin:RepeatNode"
            ]
        }
    )

In this example, ``RepeatNode`` class will be imported from
``streamz_example_plugin`` package and will be available as ``Stream.repeat``.
In practice, class name and entry point name (the part before ``=`` in entry point
definition) are usually the same, but they `can` be different.

Different kinds of add-ons go into different entry point groups:

=========== ======================= =====================
 Node type   Required parent class   Entry point group
=========== ======================= =====================
 Source      ``streamz.Source``      ``streamz.sources``
 Node        ``streamz.Stream``      ``streamz.nodes``
 Sink        ``streamz.Sink``        ``streamz.sinks``
=========== ======================= =====================


Lazy loading
++++++++++++

Streamz will attach methods from existing plugins to the ``Stream`` class when it's
imported, but actual classes will be loaded only when the corresponding ``Stream``
method is first called. Streamz will also validate the loaded class before attaching it
and will raise an appropriate exception if validation fails.


Reference implementation
------------------------

Let's look at how stream nodes can be implemented.

.. code-block:: Python

    # __init__.py

    from tornado import gen
    from streamz import Stream


    class RepeatNode(Stream):

        def __init__(self, upstream, n, **kwargs):
            super().__init__(upstream, ensure_io_loop=True, **kwargs)
            self._n = n

        @gen.coroutine
        def update(self, x, who=None, metadata=None):
            for _ in range(self._n):
                yield self._emit(x, metadata=metadata)

As you can see, implementation is the same as usual, but there's no
``@Stream.register_api()`` — Streamz will take care of that when loading the plugin.
It will still work if you add the decorator, but you don't have to.