1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
from streamz import Stream
from time import sleep
def increment(x):
return x + 1
def decrement(x):
return x - 1
def num_distinct(state, new):
state.add(new)
return state, len(state)
def test_stream_inc():
source = Stream()
res = list()
source.map(increment).sink(res.append)
source.emit(1)
source.emit(5)
assert res == [2, 6]
def test_stream_num_distinct():
source = Stream()
res = list()
source.accumulate(num_distinct, returns_state=True, start=set()).sink(res.append)
source.emit('cat')
source.emit('dog')
source.emit('cat')
source.emit('mouse')
assert res == [1, 2, 2, 3]
def test_stream_branch():
source = Stream()
res = list()
a = source.map(increment)
b = source.map(decrement)
c = a.zip(b).map(sum).sink(res.append)
source.emit(10)
res[0] == '20'
|