File: test_batch.py

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (64 lines) | stat: -rw-r--r-- 1,519 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import pytest
import toolz

from streamz.batch import Batch, Streaming
from streamz.utils_test import inc


def test_core():
    a = Batch()
    b = a.pluck('x').map(inc)
    c = b.sum()
    L = c.stream.sink_to_list()

    a.emit([{'x': i, 'y': 0} for i in range(4)])

    assert isinstance(b, Batch)
    assert isinstance(c, Streaming)
    assert L == [1 + 2 + 3 + 4]


def test_dataframes():
    pd = pytest.importorskip('pandas')
    from streamz.dataframe import DataFrame
    data = [{'x': i, 'y': 2 * i} for i in range(10)]

    s = Batch(example=[{'x': 0, 'y': 0}])
    sdf = s.map(lambda d: toolz.assoc(d, 'z', d['x'] + d['y'])).to_dataframe()

    assert isinstance(sdf, DataFrame)

    L = sdf.stream.sink_to_list()

    for batch in toolz.partition_all(3, data):
        s.emit(batch)

    result = pd.concat(L)
    assert result.z.tolist() == [3 * i for i in range(10)]


def test_periodic_dataframes():
    pd = pytest.importorskip('pandas')
    from streamz.dataframe import PeriodicDataFrame
    from streamz.dataframe.core import random_datapoint
    df = random_datapoint(now=pd.Timestamp.now())
    assert len(df) == 1

    def callback(now, **kwargs):
        return pd.DataFrame(dict(x=50, index=[now]))

    df = PeriodicDataFrame(callback, interval='20ms')
    assert df.tail(0).x == 50
    df.stop()


def test_filter():
    a = Batch()
    f = a.filter(lambda x: x % 2 == 0)
    s = f.to_stream()
    L = s.sink_to_list()

    a.emit([1, 2, 3, 4])
    a.emit([5, 6])

    assert L == [2, 4, 6]