File: sample_test.py

package info (click to toggle)
python-streamz 0.6.4-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 824 kB
  • sloc: python: 6,714; sh: 18; makefile: 17
file content (40 lines) | stat: -rw-r--r-- 863 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from streamz import Stream
from time import sleep

def increment(x):
    return x + 1

def decrement(x):
    return x - 1

def num_distinct(state, new):
    state.add(new)
    return state, len(state)

def test_stream_inc():
    source = Stream() 
    res = list()
    source.map(increment).sink(res.append)
    source.emit(1)
    source.emit(5)
    assert res == [2, 6]

def test_stream_num_distinct():
    source = Stream()
    res = list()
    source.accumulate(num_distinct, returns_state=True, start=set()).sink(res.append)
    source.emit('cat')
    source.emit('dog')
    source.emit('cat')
    source.emit('mouse')
    assert res == [1, 2, 2, 3]

def test_stream_branch():
    source = Stream()
    res = list()
    a = source.map(increment)
    b = source.map(decrement)
    c = a.zip(b).map(sum).sink(res.append)
    source.emit(10)
    res[0] == '20'