File: test_sinks.py

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (105 lines) | stat: -rw-r--r-- 2,418 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import weakref

import pytest
from streamz import Stream
from streamz.sinks import _global_sinks, Sink
from streamz.utils_test import tmpfile, wait_for


def test_sink_with_args_and_kwargs():
    L = dict()

    def mycustomsink(elem, key, prefix=""):
        key = prefix + key
        if key not in L:
            L[key] = list()
        L[key].append(elem)

    s = Stream()
    sink = s.sink(mycustomsink, "cat", "super", stream_name="test")
    s.emit(1)
    s.emit(2)

    assert L['supercat'] == [1, 2]
    assert sink.name == "test"


def test_sink_to_textfile_fp():
    source = Stream()
    with tmpfile() as filename, open(filename, "w") as fp:
        source.map(str).sink_to_textfile(fp)
        source.emit(0)
        source.emit(1)

        fp.flush()

        assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_named():
    source = Stream()
    with tmpfile() as filename:
        _sink = source.map(str).sink_to_textfile(filename)
        source.emit(0)
        source.emit(1)

        _sink._fp.flush()

        assert open(filename, "r").read() == "0\n1\n"


def test_sink_to_textfile_closes():
    source = Stream()
    with tmpfile() as filename:
        sink = source.sink_to_textfile(filename)
        fp = sink._fp
        _global_sinks.remove(sink)
        del sink

        with pytest.raises(ValueError, match=r"I/O operation on closed file\."):
            fp.write(".")


def test_sink_destroy():
    source = Stream()
    sink = Sink(source)
    ref = weakref.ref(sink)
    sink.destroy()

    assert sink not in _global_sinks

    del sink

    assert ref() is None


def test_ws_roundtrip():
    pytest.importorskip("websockets")
    s0 = Stream.from_websocket("localhost", 8989, start=True)
    l = s0.sink_to_list()

    data = [b'0123'] * 4
    s = Stream.from_iterable(data)
    s.to_websocket("ws://localhost:8989")
    s.start()

    wait_for(lambda: data == l, timeout=1)
    s.stop()
    s0.stop()


def test_mqtt_roundtrip():
    pytest.importorskip("paho.mqtt.client")
    s0 = Stream.from_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
    l = s0.map(lambda msg: msg.payload).sink_to_list()
    s0.start()

    data = [b'0123'] * 4
    s = Stream.from_iterable(data)
    s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature")
    s.start()

    wait_for(lambda: data == l, timeout=1)
    s.stop()
    s0.stop()