File: test_context.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (45 lines) | stat: -rw-r--r-- 1,630 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import stomp
from stomp.listener import TestListener
from .testutils import *


class TestContext(object):
    timestamp = time.strftime("%Y%m%d%H%M%S")

    def send_test_message(self, conn):
        listener = TestListener("123", print_to_log=True)
        conn.set_listener("testlistener", listener)
        conn.connect(get_default_user(), get_default_password(), wait=True)

        queuename = "/queue/test1-%s" % self.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")
        conn.send(body="this is a test", destination=queuename, receipt="123")
        listener.wait_for_message()

    @staticmethod
    def after_test_message(conn):
        conn.remove_listener("testlistener")

    @staticmethod
    def check_asserts(conn):
        listener = conn.get_listener("testlistener")
        assert listener.connections == 1, "should have received 1 connection acknowledgement"
        assert listener.disconnects >= 1, "should have received 1 disconnect"

    def test_with_context_stomp11(self):
        with stomp.Connection11(get_default_host()) as conn:
            self.send_test_message(conn)
        self.check_asserts(conn)
        self.after_test_message(conn)

    def test_with_context_stomp10(self):
        with stomp.Connection10(get_default_host()) as conn:
            self.send_test_message(conn)
        self.check_asserts(conn)
        self.after_test_message(conn)

    def test_with_context_stomp12(self):
        with stomp.Connection12(get_default_host()) as conn:
            self.send_test_message(conn)
        self.check_asserts(conn)
        self.after_test_message(conn)