File: test_framing.py

package info (click to toggle)
python-tubes 0.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 740 kB
  • sloc: python: 3,215; makefile: 149
file content (184 lines) | stat: -rw-r--r-- 5,266 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for framing protocols.
"""

from unittest import TestCase

from tubes.framing import bytesToNetstrings

from tubes.test.util import FakeFount, FakeDrain
from tubes.tube import tube, series

from tubes.framing import (netstringsToBytes, bytesToLines, linesToBytes,
                       bytesToIntPrefixed, intPrefixedToBytes)

class NetstringTests(TestCase):
    """
    Tests for parsing netstrings.
    """

    def test_stringToNetstring(self):
        """
        A byte-string is given a length prefix.
        """
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(bytesToNetstrings())).flowTo(fd)
        ff.drain.receive(b"hello")
        self.assertEqual(
            fd.received, [b"%(len)d:%(data)s," %
                          {b"len": len(b"hello"), b"data": b"hello"}]
        )


    def test_bytesToNetstrings(self):
        """
        L{bytesToNetstrings} works on subsequent inputs as well.
        """
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(bytesToNetstrings())).flowTo(fd)
        ff.drain.receive(b"hello")
        ff.drain.receive(b"world")
        self.assertEqual(
            b"".join(fd.received),
            b"%(len)d:%(data)s,%(len2)d:%(data2)s," % {
                b"len": len(b"hello"), b"data": b"hello",
                b"len2": len(b"world"), b"data2": b"world",
            }
        )


    def test_netstringToString(self):
        """
        Length prefix is stripped off.
        """
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(netstringsToBytes())).flowTo(fd)
        ff.drain.receive(b"1:x,2:yz,3:")
        self.assertEqual(fd.received, [b"x", b"yz"])



class LineTests(TestCase):
    """
    Tests for parsing delimited data ("lines").
    """

    def test_stringToLines(self):
        """
        A line is something delimited by a LF or CRLF.
        """
        def splitALine(newline):
            ff = FakeFount()
            fd = FakeDrain()
            ff.flowTo(series(bytesToLines())).flowTo(fd)
            ff.drain.receive(newline.join([b"alpha", b"beta", b"gamma"]))
            self.assertEqual(fd.received, [b"alpha", b"beta"])
        splitALine(b"\n")
        splitALine(b"\r\n")


    def test_linesToBytes(self):
        """
        Writing out lines delimits them, with the delimiter.
        """
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(linesToBytes())).flowTo(fd)
        ff.drain.receive(b"hello")
        ff.drain.receive(b"world")
        self.assertEqual(b"".join(fd.received), b"hello\r\nworld\r\n")


    def test_rawMode(self):
        """
        You should be able to have some lines, and then some bytes, and then
        some lines.
        """

        lines = bytesToLines()
        ff = FakeFount()
        fd = FakeDrain()

        @tube
        class Switcher(object):
            def received(self, line):
                splitted = line.split(b" ", 1)
                if splitted[0] == b'switch':
                    length = int(splitted[1])
                    lines.divert(series(Switchee(length), fd))

        @tube
        class Switchee(object):
            datums = []
            def __init__(self, length):
                self.length = length
            def received(self, data):
                self.datums.append(data)

        cc = series(lines, Switcher())
        ff.flowTo(cc).flowTo(fd)
        ff.drain.receive(b"hello\r\nworld\r\nswitch 10\r\nabcde\r\nfgh"
                         # + '\r\nagain\r\n'
                         )
        self.assertEqual(b"".join(Switchee.datums), b"abcde\r\nfgh")


    def test_switchingWithMoreDataToDeliver(self):
        """
        Switching drains should immediately stop delivering data.
        """
        lines = bytesToLines()
        ff = FakeFount()
        fd1 = FakeDrain()
        fd2 = FakeDrain()

        @tube
        class Switcher(object):
            def received(self, line):
                if b'switch' in line:
                    lines.divert(series(netstringsToBytes(), fd2))
                else:
                    yield line

        cc = series(lines, Switcher())
        ff.flowTo(cc).flowTo(fd1)
        ff.drain.receive(b'something\r\nswitch\r\n7:hello\r\n,5:world,')
        self.assertEqual(fd1.received, [b"something"])
        self.assertEqual(fd2.received, [b'hello\r\n', b'world'])



class PackedPrefixTests(TestCase):
    """
    Test cases for `packedPrefix`.
    """

    def test_prefixIn(self):
        """
        Parse some prefixed data.
        """
        packed = bytesToIntPrefixed(8)
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(packed)).flowTo(fd)
        ff.drain.receive(b"\x0812345678\x02")
        self.assertEqual(fd.received, [b"12345678"])


    def test_prefixOut(self):
        """
        Emit some prefixes.
        """
        packed = intPrefixedToBytes(8)
        ff = FakeFount()
        fd = FakeDrain()
        ff.flowTo(series(packed, fd))
        ff.drain.receive(b'a')
        ff.drain.receive(b'bc')
        ff.drain.receive(b'def')
        self.assertEqual(fd.received, [b'\x01a', b'\x02bc', b'\x03def'])