1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
import stomp
from stomp.listener import TestListener
from .testutils import *
@pytest.fixture()
def conn():
conn = stomp.Connection11(get_default_host())
conn.set_listener("testlistener", TestListener("123", print_to_log=True))
conn.connect(get_default_user(), get_default_password(), wait=True)
yield conn
conn.disconnect(receipt=None)
class TestActiveMQ(object):
def test_send_to_activemq(self, conn):
conn.subscribe(destination="/queue/test", id=1, ack="auto")
conn.send(body="this is a test", destination="/queue/test", content_type="text/blah", receipt="123")
validate_send(conn)
logging.info(conn.get_listener("testlistener").get_latest_message())
|