File: test_graph.py

package info (click to toggle)
python-streamz 0.6.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 824 kB
  • sloc: python: 6,714; makefile: 18; sh: 18
file content (86 lines) | stat: -rw-r--r-- 2,170 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from operator import add, mul
import os

import pytest
nx = pytest.importorskip('networkx')

from streamz import Stream, create_graph, visualize
from streamz.utils_test import tmpfile

from ..graph import _clean_text


def test_create_graph():
    source1 = Stream(stream_name='source1')
    source2 = Stream(stream_name='source2')

    n1 = source1.zip(source2)
    n2 = n1.map(add)
    s = n2.sink(source1.emit)

    g = nx.DiGraph()
    create_graph(n2, g)
    for t in [hash(a) for a in [source1, source2, n1, n2, s]]:
        assert t in g
    for e in [(hash(a), hash(b)) for a, b in [
        (source1, n1),
        (source2, n1),
        (n1, n2),
        (n2, s)
    ]]:
        assert e in g.edges()


def test_create_cyclic_graph():
    source1 = Stream(stream_name='source1')
    source2 = Stream(stream_name='source2')

    n1 = source1.zip(source2)
    n2 = n1.map(add)
    n2.connect(source1)

    g = nx.DiGraph()
    create_graph(n2, g)
    for t in [hash(a) for a in [source1, source2, n1, n2]]:
        assert t in g
    assert nx.find_cycle(g)
    for e in [(hash(a), hash(b)) for a, b in [
        (source1, n1),
        (source2, n1),
        (n1, n2),
        (n2, source1)
    ]]:
        assert e in g.edges()


def test_create_file():
    source1 = Stream(stream_name='source1')
    source2 = Stream(stream_name='source2')

    n1 = source1.zip(source2)
    n2 = n1.map(add).scan(mul).map(lambda x : x + 1)
    n2.sink(source1.emit)

    with tmpfile(extension='png') as fn:
        visualize(n1, filename=fn)
        assert os.path.exists(fn)

    with tmpfile(extension='svg') as fn:
        n1.visualize(filename=fn, rankdir="LR")
        assert os.path.exists(fn)

    with tmpfile(extension='dot') as fn:
        n1.visualize(filename=fn, rankdir="LR")
        with open(fn) as f:
            text = f.read()

        for word in ['rankdir', 'source1', 'source2', 'zip', 'map', 'add',
                     'shape=box', 'shape=ellipse']:
            assert word in text


def test_cleantext():
    text = "JFDSM*(@&$:FFDS:;;"
    expected_text = "JFDSM ;FFDS; "
    cleaned_text = _clean_text(text)
    assert cleaned_text == expected_text