1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
#!/usr/bin/env python3
# Test whether a valid CONNECT results in the correct CONNACK packet.
from mosq_test_helper import *
import importlib
from os import walk
import socket
import json
from collections import deque
import mosq_test
send = 1
recv = 2
disconnected_check = 3
connected_check = 4
publish = 5
class SingleMsg(object):
__slots__ = 'action', 'message', 'comment'
def __init__(self, action, message, comment=''):
self.action = action
self.message = message
self.comment = comment
class MsgSequence(object):
__slots__ = 'name', 'msgs', 'expect_disconnect'
def __init__(self, name, default_connect=True, proto_ver=4, expect_disconnect=True):
self.name = name
self.msgs = deque()
self.expect_disconnect = expect_disconnect
if default_connect:
self.add_default_connect(proto_ver=proto_ver)
def add_default_connect(self, proto_ver):
self.add_send(mosq_test.gen_connect(self.name, keepalive=60, proto_ver=proto_ver))
self.add_recv(mosq_test.gen_connack(rc=0, proto_ver=proto_ver), "default connack")
def add_send(self, message):
self._add(send, message)
def add_recv(self, message, comment):
self._add(recv, message, comment)
def add_publish(self, message, comment):
self._add(publish, message, comment)
def add_connected_check(self):
self._add(connected_check, b"")
def add_disconnected_check(self):
self._add(disconnected_check, b"")
def _add(self, action, message, comment=""):
msg = SingleMsg(action, message, comment)
self.msgs.append(msg)
def _connected_check(self, sock):
try:
mosq_test.do_ping(sock)
except mosq_test.TestError:
raise ValueError("connection failed")
def _send_message(self, sock, msg):
sock.send(msg.message)
def _publish_message(self, msg):
sock = mosq_test.client_connect_only(hostname="localhost", port=1888, timeout=2)
sock.send(mosq_test.gen_connect("helper", keepalive=60))
mosq_test.expect_packet(sock, "connack", mosq_test.gen_connack(rc=0))
m = msg.message
if m['qos'] == 0:
sock.send(mosq_test.gen_publish(topic=m['topic'], payload=m['payload']))
elif m['qos'] == 1:
sock.send(mosq_test.gen_publish(mid=1, qos=1, topic=m['topic'], payload=m['payload']))
mosq_test.expect_packet(sock, "helper puback", mosq_test.gen_puback(mid=1))
elif m['qos'] == 2:
sock.send(mosq_test.gen_publish(mid=1, qos=2, topic=m['topic'], payload=m['payload']))
mosq_test.expect_packet(sock, "helper pubrec", mosq_test.gen_pubrec(mid=1))
sock.send(mosq_test.gen_pubrel(mid=1))
mosq_test.expect_packet(sock, "helper pubcomp", mosq_test.gen_pubcomp(mid=1))
sock.close()
def _recv_message(self, sock, msg):
data = sock.recv(len(msg.message))
if data != msg.message:
raise ValueError("Receive message %s | %s | %s" % (msg.comment, data, msg.message))
def _disconnected_check(self, sock):
try:
data = sock.recv(1)
if len(data) == 1 and self.expect_disconnect:
raise ValueError("Still connected")
except ConnectionResetError:
if self.expect_disconnect:
pass
else:
raise
def _process_message(self, sock, msg):
if msg.action == send:
self._send_message(sock, msg)
elif msg.action == recv:
self._recv_message(sock, msg)
elif msg.action == publish:
self._publish_message(msg)
elif msg.action == disconnected_check:
self._disconnected_check(sock)
elif msg.action == connected_check:
self._connected_check(sock)
def process_next(self, sock):
msg = self.msgs.popleft()
self._process_message(sock, msg)
def process_all(self, sock):
while len(self.msgs):
self.process_next(sock)
if self.expect_disconnect:
self._disconnected_check(sock)
else:
self._connected_check(sock)
def do_test(hostname, port):
rc = 0
sequences = []
for (_, _, filenames) in walk("data"):
sequences.extend(filenames)
break
total = 0
succeeded = 0
test = None
for seq in sorted(sequences):
if seq[-5:] != ".json":
continue
with open("data/"+seq, "r") as f:
test_file = json.load(f)
for g in test_file:
group_name = g["group"]
try:
disabled = g["disable"]
if disabled:
continue
except KeyError:
pass
tests = g["tests"]
for t in tests:
tname = group_name + " " + t["name"]
try:
proto_ver = t["ver"]
except KeyError:
proto_ver = 4
try:
connect = t["connect"]
except KeyError:
connect = True
try:
expect_disconnect = t["expect_disconnect"]
except KeyError:
expect_disconnect = True
this_test = MsgSequence(tname,
proto_ver=proto_ver,
expect_disconnect=expect_disconnect,
default_connect=connect)
for m in t["msgs"]:
try:
c = m["comment"]
except KeyError:
c = ""
if m["type"] == "send":
this_test.add_send(bytes.fromhex(m["payload"].replace(" ", "")))
elif m["type"] == "recv":
this_test.add_recv(bytes.fromhex(m["payload"].replace(" ", "")), c)
elif m["type"] == "publish":
this_test.add_publish(m, c)
total += 1
try:
sock = mosq_test.client_connect_only(hostname=hostname, port=port, timeout=2)
this_test.process_all(sock)
print("\033[32m" + tname + "\033[0m")
succeeded += 1
except ValueError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except ConnectionResetError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except socket.timeout as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
except mosq_test.TestError as e:
print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
rc = 1
print("%d tests total\n%d tests succeeded" % (total, succeeded))
return rc
hostname = "localhost"
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, nolog=True)
rc = 0
try:
rc = do_test(hostname=hostname, port=port)
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if broker.returncode != 0:
rc = broker.returncode
print(f"Broker exited with code {rc}. If there are no obvious errors this may be due to an ASAN build having leaks, which must be fixed.")
print("The easiest way to reproduce this is to run the broker with `mosquitto -p 1888`, rerun the test, then quit the broker.")
if rc:
#print(stde.decode('utf-8'))
exit(rc)
|