File: test_rabbitmq.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (34 lines) | stat: -rw-r--r-- 1,261 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import stomp
from stomp.listener import TestListener, WaitingListener
from .testutils import *


@pytest.fixture()
def conn():
    conn = stomp.Connection11(get_rabbitmq_host())
    listener = TestListener("123", print_to_log=True)
    listener2 = WaitingListener("456")
    conn.set_listener("123", listener)
    conn.set_listener("456", listener2)
    conn.connect(get_rabbitmq_user(), get_rabbitmq_password(), wait=True)
    yield conn


class TestRabbitMQSend(object):

    def test_send(self, conn):
        listener = conn.get_listener("123")
        listener2 = conn.get_listener("456")

        queue_name = "/queue/test-%s" % listener.timestamp
        conn.subscribe(destination=queue_name, id=1, ack="auto")
        conn.send(body="this is a test", destination=queue_name, receipt="123")
        listener.wait_on_receipt()

        conn.disconnect(receipt="456")
        listener2.wait_on_receipt()

        assert listener.connections == 1, "should have received 1 connection acknowledgement"
        assert listener.messages == 1, "should have received 1 message"
        assert listener.errors == 0, "should not have received any errors"
        assert listener.disconnects == 1, "should have received 1 disconnect, was %s" % listener.disconnects