File: test_ssl_sni.py

package info (click to toggle)
python-stomp 8.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 632 kB
  • sloc: python: 4,176; makefile: 248; xml: 42; sh: 1
file content (41 lines) | stat: -rw-r--r-- 1,659 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import socket
import stomp
from stomp.listener import TestListener
from .testutils import *


class TestSNIMQSend(object):
    """
    To test SNI:

    - Start the docker container

    Connections with SNI to "my.example.com" will be routed to the STOMP server on port 62613.
    Connections without SNI won't be routed.

    """

    def testconnect(self, monkeypatch):
        def getaddrinfo_fake(host, port, *args, **kw):
            """Always return the IP address of the container."""
            return [(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', ('172.17.0.2', port))]
        monkeypatch.setattr(socket, "getaddrinfo", getaddrinfo_fake)
        if not is_inside_travis():
            logging.info("running ipv6 test")
            receipt_id = str(uuid.uuid4())
            conn = stomp.Connection11(get_sni_ssl_host())
            conn.set_ssl(get_sni_ssl_host())
            listener = TestListener(receipt_id, print_to_log=True)
            conn.set_listener('', listener)
            conn.connect(get_default_user(), get_default_password(), wait=True)
            conn.subscribe(destination="/queue/test", id=1, ack="auto")

            logging.info("sending message with receipt %s" % receipt_id)
            conn.send(body="this is a test", destination="/queue/test", receipt=receipt_id)

            listener.wait_for_message()
            conn.disconnect(receipt=None)

            assert listener.connections == 1, "should have received 1 connection acknowledgement"
            assert listener.messages >= 1, "should have received 1 message"
            assert listener.errors == 0, "should not have received any errors"