File: test_override_threading.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (61 lines) | stat: -rw-r--r-- 1,665 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from concurrent.futures import ThreadPoolExecutor

import stomp
from stomp.listener import TestListener
from .testutils import *
from stomp import logging


executor = ThreadPoolExecutor()


def create_thread(fc):
    f = executor.submit(fc)
    print("Created future %s on executor %s" % (f, executor))
    return f


class ReconnectListener(TestListener):
    def __init__(self, conn):
        TestListener.__init__(self, "123", True)
        self.conn = conn

    def on_receiver_loop_ended(self, *args):
        if self.conn:
            c = self.conn
            self.conn = None
            c.connect(get_default_user(), get_default_password(), wait=True)
            c.disconnect()


@pytest.fixture
def conn():
    conn = stomp.Connection(get_default_host())

    # check thread override here
    conn.transport.override_threading(create_thread)

    listener = ReconnectListener(conn)
    conn.set_listener("testlistener", listener)
    conn.connect(get_default_user(), get_default_password(), wait=True)
    yield conn


class TestThreadingOverride(object):
    def test_threading(self, conn):
        listener = conn.get_listener("testlistener")
        queuename = "/queue/test1-%s" % listener.timestamp
        conn.subscribe(destination=queuename, id=1, ack="auto")

        conn.send(body="this is a test", destination=queuename, receipt="123")

        validate_send(conn, 1, 1, 0)

        logging.info("first disconnect")
        conn.disconnect(receipt="112233")

        logging.info("reconnecting")
        conn.connect(get_default_user(), get_default_password(), wait=True)

        logging.info("second disconnect")
        conn.disconnect()