File: test_artemis.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (39 lines) | stat: -rw-r--r-- 1,258 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import stomp
from stomp.listener import TestListener
from .testutils import *


@pytest.fixture()
def conn():
    conn = stomp.Connection11(get_artemis_host())
    conn.set_listener("testlistener", TestListener("123", print_to_log=True))
    conn.connect(get_artemis_user(), get_artemis_password(), wait=True)
    yield conn
    conn.disconnect(receipt=None)


@pytest.fixture()
def conn2():
    conn2 = stomp.Connection11(get_artemis_host())
    conn2.set_listener("testlistener", TestListener("456", print_to_log=True))
    conn2.connect(get_artemis_user(), get_artemis_password(), wait=True, headers={'consumerWindowSize': 0})
    yield conn2
    conn2.disconnect(receipt=None)


class TestArtemis(object):

    def test_send_to_artemis(self, conn):
        conn.subscribe(destination="/queue/test", id=1, ack="auto")

        conn.send(body="this is a test", destination="/queue/test", receipt="123")

        validate_send(conn)

    def test_prefetchsize(self, conn2):
        conn2.subscribe(destination="/queue/test2", id=2, ack="auto", headers={'consumerWindowSize': 0})

        conn2.send(body="testing sending a message after subscribing with prefetch",
                   destination="/queue/test2", receipt="456")

        validate_send(conn2)