File: test_multicast.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (103 lines) | stat: -rw-r--r-- 3,176 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from stomp.adapter.multicast import MulticastConnection
from stomp.listener import TestListener
from .testutils import *


@pytest.fixture
def conn():
    conn = MulticastConnection()
    listener = TestListener("123", print_to_log=True)
    conn.set_listener("testlistener", listener)
    conn.connect()
    yield conn
    conn.disconnect(receipt=None)


@pytest.fixture
def connutf16():
    conn = MulticastConnection(encoding="utf-16")
    listener = TestListener("123", print_to_log=True)
    conn.set_listener("testlistener", listener)
    conn.connect()
    yield conn
    conn.disconnect(receipt=None)


class TestMulticast(object):

    def testsubscribe(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/test1-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        validate_send(conn, 1, 1, 0)

    def testunsubscribe(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/test1-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        validate_send(conn, 1, 1, 0)

        conn.unsubscribe(1)

        conn.send(body="this is a test", destination=queuename, receipt="124")

        time.sleep(3)

        assert listener.messages == 1, "should have only received 1 message"

    def testtransactions(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/test1-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")

        trans_id = get_uuid()
        conn.begin(trans_id)

        conn.send(body="this is a test", transaction=trans_id, destination=queuename, receipt="123")

        time.sleep(1)

        assert listener.messages == 0, "should not have received any messages"

        conn.commit(trans_id)

        listener.wait_on_receipt()

        assert listener.messages == 1, "should have received 1 message"

        conn.begin(trans_id)

        conn.send(body="this is a test", transaction=trans_id, destination=queuename, receipt="124")

        conn.abort(trans_id)

        time.sleep(3)

        assert listener.messages == 1, "should have only received 1 message"


class TestNonAsciiViaMulticast(object):

    def test_send_nonascii_auto_encoding(self, connutf16):
        listener = connutf16.get_listener("testlistener")
        queuename = "/queue/multicast-nonascii-%s" % listener.timestamp
        connutf16.subscribe(destination=queuename, ack="auto", id="1")

        txt = test_text_for_utf16
        connutf16.send(body=txt, destination=queuename, receipt="123")

        listener.wait_for_message()

        assert listener.connections >= 1, "should have received 1 connection acknowledgement"
        assert listener.messages >= 1, "should have received 1 message"
        assert listener.errors == 0, "should not have received any errors"

        (_, msg) = listener.get_latest_message()

        assert txt.encode("utf-8") == msg