1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
from concurrent.futures import ThreadPoolExecutor
import stomp
from stomp.listener import TestListener
from .testutils import *
from stomp import logging
executor = ThreadPoolExecutor()
def create_thread(fc):
f = executor.submit(fc)
print("Created future %s on executor %s" % (f, executor))
return f
class ReconnectListener(TestListener):
def __init__(self, conn):
TestListener.__init__(self, "123", True)
self.conn = conn
def on_receiver_loop_ended(self, *args):
if self.conn:
c = self.conn
self.conn = None
c.connect(get_default_user(), get_default_password(), wait=True)
c.disconnect()
@pytest.fixture
def conn():
conn = stomp.Connection(get_default_host())
# check thread override here
conn.transport.override_threading(create_thread)
listener = ReconnectListener(conn)
conn.set_listener("testlistener", listener)
conn.connect(get_default_user(), get_default_password(), wait=True)
yield conn
class TestThreadingOverride(object):
def test_threading(self, conn):
listener = conn.get_listener("testlistener")
queuename = "/queue/test1-%s" % listener.timestamp
conn.subscribe(destination=queuename, id=1, ack="auto")
conn.send(body="this is a test", destination=queuename, receipt="123")
validate_send(conn, 1, 1, 0)
logging.info("first disconnect")
conn.disconnect(receipt="112233")
logging.info("reconnecting")
conn.connect(get_default_user(), get_default_password(), wait=True)
logging.info("second disconnect")
conn.disconnect()
|