from streamz import Stream
from time import sleep

def increment(x):
    return x + 1

def decrement(x):
    return x - 1

def num_distinct(state, new):
    state.add(new)
    return state, len(state)

def test_stream_inc():
    source = Stream() 
    res = list()
    source.map(increment).sink(res.append)
    source.emit(1)
    source.emit(5)
    assert res == [2, 6]

def test_stream_num_distinct():
    source = Stream()
    res = list()
    source.accumulate(num_distinct, returns_state=True, start=set()).sink(res.append)
    source.emit('cat')
    source.emit('dog')
    source.emit('cat')
    source.emit('mouse')
    assert res == [1, 2, 2, 3]

def test_stream_branch():
    source = Stream()
    res = list()
    a = source.map(increment)
    b = source.map(decrement)
    c = a.zip(b).map(sum).sink(res.append)
    source.emit(10)
    res[0] == '20'

