File: test_s12.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (135 lines) | stat: -rw-r--r-- 4,813 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import stomp
from stomp import exception
from stomp.listener import TestListener
from .testutils import *


@pytest.fixture()
def conn():
    conn = stomp.Connection12(get_default_host())
    listener = TestListener("123", print_to_log=True)
    conn.set_listener("testlistener", listener)
    conn.connect(get_default_user(), get_default_password(), wait=True)
    yield conn
    conn.disconnect(receipt=None)


class Test12Connect(object):

    def test_send(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testsend12-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")

        conn.send(body="this is a test using protocol 1.2", destination=queuename, receipt="123")

        validate_send(conn, 1, 1, 0)

    def test_clientack(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testclientack12-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="client-individual")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        listener.wait_for_message()

        (headers, _) = listener.get_latest_message()

        ack_id = headers["ack"]

        conn.ack(ack_id)

    def test_clientnack(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testclientnack12-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="client-individual")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        listener.wait_for_message()

        (headers, _) = listener.get_latest_message()

        ack_id = headers["ack"]

        conn.nack(ack_id)

    def test_should_send_extra_header_clientnack(self, conn, mocker):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testclientnack12-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="client-individual")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        listener.wait_for_message()

        (headers, _) = listener.get_latest_message()

        ack_id = headers["ack"]

        wrapped_send_frame = mocker.patch.object(conn, "send_frame", wraps=conn.send_frame)
        conn.nack(ack_id, requeue="false")
        expected_headers = {HDR_ID: ack_id.replace(':', '\\c'), "requeue": "false"}
        wrapped_send_frame.assert_called_with(CMD_NACK, expected_headers)

    def test_timeout(self):
        server = StubStompServer("127.0.0.1", 60000)
        try:
            server.start()

            server.add_frame('''ERROR
message: connection failed\x00''')

            conn = stomp.Connection12([("127.0.0.1", 60000)])
            listener = TestListener(print_to_log=True)
            conn.set_listener('', listener)
            try:
                conn.connect(wait=True)
                self.fail("shouldn't happen")
            except exception.ConnectFailedException:
                pass
        finally:
            server.stop()

    def test_specialchars(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testspecialchars12-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="client")

        hdrs = {
            "special-1": "test with colon : test",
            "special-2": "test with backslash \\ test",
            "special-3": "test with newlines \n \n",
            "special-4": "test with carriage return \r"
        }

        conn.send(body="this is a test", headers=hdrs, destination=queuename, receipt="123")

        listener.wait_for_message()

        (headers, _) = listener.get_latest_message()

        _ = headers["message-id"]
        _ = headers["subscription"]
        assert "special-1" in headers
        assert "test with colon : test" == headers["special-1"]
        assert "special-2" in headers
        assert "test with backslash \\ test" == headers["special-2"]
        assert "special-3" in headers
        assert "test with newlines \n \n" == headers["special-3"]
        assert "special-4" in headers
        cr_header = headers["special-4"].replace('\\r', '\r')
        assert "test with carriage return \r" == cr_header

    def test_suppress_content_length(self, conn, mocker):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/testspecialchars12-%s" % listener.timestamp
        conn = stomp.Connection12(get_default_host(), vhost=get_default_vhost(), auto_content_length=False)
        conn.transport = mocker.Mock()

        conn.send(body="test", destination=queuename, receipt="123")

        args, kwargs = conn.transport.transmit.call_args
        frame = args[0]
        assert "content-length" not in frame.headers