1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
from stomp.__main__ import StompCLI
from .testutils import *
username = get_default_user()
password = get_default_password()
(sslhost, sslport) = get_ssl_host()[0]
class TestSSLCLI(object):
def test_ssl(self):
teststdin = StubStdin()
teststdout = StubStdout(self)
teststdout.expect("CONNECTED")
cli = StompCLI(sslhost, sslport, username, password, "1.0", use_ssl=True, stdin=teststdin, stdout=teststdout)
time.sleep(3)
teststdout.expect("Subscribing to '/queue/testsubscribe' with acknowledge set to 'auto', id set to '1'")
cli.onecmd("subscribe /queue/testsubscribe")
teststdout.expect("MESSAGE")
teststdout.expect("this is a test")
cli.onecmd("send /queue/testsubscribe this is a test")
time.sleep(3)
teststdout.expect("Unsubscribing from '/queue/testsubscribe'")
cli.onecmd("unsubscribe /queue/testsubscribe")
teststdout.expect("Shutting down, please wait")
cli.onecmd("quit")
|