File: test_activemq.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (24 lines) | stat: -rw-r--r-- 718 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import stomp
from stomp.listener import TestListener
from .testutils import *


@pytest.fixture()
def conn():
    conn = stomp.Connection11(get_default_host())
    conn.set_listener("testlistener", TestListener("123", print_to_log=True))
    conn.connect(get_default_user(), get_default_password(), wait=True)
    yield conn
    conn.disconnect(receipt=None)


class TestActiveMQ(object):

    def test_send_to_activemq(self, conn):
        conn.subscribe(destination="/queue/test", id=1, ack="auto")

        conn.send(body="this is a test", destination="/queue/test", content_type="text/blah", receipt="123")

        validate_send(conn)

        logging.info(conn.get_listener("testlistener").get_latest_message())