File: test_ss.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (205 lines) | stat: -rw-r--r-- 6,198 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import stomp
from stomp.listener import *
from .testutils import *


@pytest.fixture
def server():
    server = StubStompServer("127.0.0.1", 60000)
    server.start()
    yield server
    server.stop()


class TestWithStompServer(object):

    def test_alternate_hosts(self, server):
        server.add_frame('''CONNECTED
version:1.1
session:1
server:test
heart-beat:1000,1000

\x00''')
        stomp.logging.verbose = True
        conn = stomp.Connection([("192.0.2.0", 10000), ("127.0.0.1", 60000)], timeout=1, prefer_localhost=False)
        listener = TestListener(print_to_log=True)
        conn.set_listener('', listener)
        conn.connect(wait=True)

    def test_disconnect(self, server):
        server.add_frame('''CONNECTED
version:1.1
session:1
server:test
heart-beat:1000,1000

\x00''')

        conn = stomp.Connection([("127.0.0.1", 60000)])
        listener = TestListener(print_to_log=True)
        conn.set_listener('', listener)
        conn.connect()

        time.sleep(2)

        server.stop()

        for _ in range(100):
            if server.stopped:
                break
            time.sleep(0.1)
        else:
            assert False, "server never disconnected"

        time.sleep(1)

        try:
            conn.send(body="test disconnect", destination="/test/disconnectqueue")
            pytest.fail("Should not have successfully sent a message at this point")
        except Exception:
            _, e, _ = sys.exc_info()
            if e.__class__ == AssertionError:
                pytest.fail(str(e))
            logging.debug("stopping conn after expected exception %s", e)
            # lost connection, now restart the server
            try:
                conn.disconnect(receipt=None)
            except:
                pass

            time.sleep(2)

            server.add_frame('''CONNECTED
version:1.1
session:1
server:test
heart-beat:1000,1000

\x00''')

            server.start()

            conn.connect()

            time.sleep(5)

        assert listener.connections >= 2, "should have received 2 connection acknowledgements"

    def test_parsing(self, server):

        def pump(n):
            # pump; test server gives us one frame per received something
            for x in range(n):
                if x == 0:
                    logging.debug("pump sending %s frames" % n)
                conn.transport.send(b"\n")
                time.sleep(0.01)

        # Trailing optional EOLs in a frame

        server.add_frame('''CONNECTED
version:1.1
session:1
server:test
heart-beat:1000,1000

\x00''' + "\n\n\n")
        expected_heartbeat_count = 0

        conn = stomp.Connection([("127.0.0.1", 60000)], heartbeats=(1000, 1000))
        listener = TestListener(print_to_log=True)
        conn.set_listener('', listener)
        conn.connect()

        logging.info("test parsing (1) expected hb count is %s", expected_heartbeat_count)
        assert expected_heartbeat_count == listener.heartbeat_count, "(1) expected hb count %s, was %s" % (expected_heartbeat_count, listener.heartbeat_count)

        # No trailing EOLs, separate heartbeat

        message_body = "Hello\n...world!"
        message_frame = '''MESSAGE
content-type:text/plain

%s\x00''' % message_body

        server.add_frame(message_frame)
        server.add_frame("\n")
        expected_heartbeat_count += 1

        pump(2)

        logging.info("test parsing (2) expected hb count is %s", expected_heartbeat_count)

        listener.wait_for_heartbeat()
        headers, body = listener.get_latest_message()

        assert expected_heartbeat_count == listener.heartbeat_count, "(2) expected hb count %s, was %s" % (expected_heartbeat_count, listener.heartbeat_count)
        assert {"content-type": "text/plain"} == headers
        assert message_body == body

        # Trailing EOL, separate heartbeat, another message

        server.add_frame(message_frame + "\n")
        server.add_frame("\n")
        server.add_frame(message_frame + "\n")
        expected_heartbeat_count += 1

        pump(3)

        logging.info("test parsing (3) expected hb count is %s", expected_heartbeat_count)

        listener.wait_for_heartbeat()
        listener.wait_for_message()
        headers, body = listener.get_latest_message()

        assert expected_heartbeat_count == listener.heartbeat_count, "(3) expected hb count %s, was %s" % (expected_heartbeat_count, listener.heartbeat_count)
        assert {"content-type": "text/plain"} == headers
        assert message_body == body

        # Torture tests: return content one byte at a time

        server.add_frame("\n")
        server.add_frame(message_frame)
        server.add_frame("\n")
        expected_heartbeat_count += 2

        pump(len(message_frame) + 2)

        logging.info("test parsing (4) expected hb count is %s", expected_heartbeat_count)

        listener.wait_for_heartbeat()
        headers, body = listener.get_latest_message()

        assert expected_heartbeat_count == listener.heartbeat_count, "(4) expected hb count %s, was %s" % (expected_heartbeat_count, listener.heartbeat_count)
        assert {"content-type": "text/plain"} == headers
        assert message_body == body

        # ...and a similar one with content-length and null bytes in body

        message_body = "%s\x00\x00%s" % (message_body, message_body)
        message_frame = '''MESSAGE
content-type:text/plain
content-length:%s

%s\x00''' % (len(message_body), message_body)

        server.add_frame("\n")
        server.add_frame("\n")
        server.add_frame(message_frame)
        server.add_frame("\n")
        expected_heartbeat_count += 3

        pump(len(message_frame) + 3)

        logging.info("test parsing (5) expected hb count is %s", expected_heartbeat_count)

        listener.wait_for_heartbeat()
        headers, body = listener.get_latest_message()

        assert expected_heartbeat_count == listener.heartbeat_count, "(5) expected hb count %s, was %s" % (expected_heartbeat_count, listener.heartbeat_count)
        assert {
            "content-type": "text/plain",
            "content-length": str(len(message_body)),
        } == headers
        assert message_body == body