File: test_ipv6.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (30 lines) | stat: -rw-r--r-- 878 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import stomp
from stomp.listener import TestListener
from stomp import logging
from .testutils import *



@pytest.fixture()
def conn():
    if not is_inside_travis():
        conn = stomp.Connection11(get_ipv6_host())
        conn.set_listener("testlistener", TestListener("123", print_to_log=True))
        conn.connect("admin", "password", wait=True)
        yield conn
        conn.disconnect(receipt=None)
    else:
        yield None


class TestIP6(object):
    def test_ipv6_send(self, conn):
        if not is_inside_travis():
            logging.info("running ipv6 test")
            timestamp = time.strftime("%Y%m%d%H%M%S")
            queuename = "/queue/testipv6-%s" % timestamp
            conn.subscribe(destination=queuename, id=1, ack="auto")

            conn.send(body="this is a test", destination=queuename, receipt="123")

            validate_send(conn)