File: mqtt.uts

package info (click to toggle)
scapy 2.6.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,956 kB
  • sloc: python: 163,618; sh: 90; makefile: 11
file content (189 lines) | stat: -rw-r--r-- 5,187 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# MQTT layer unit tests
# Copyright (C) Santiago Hernandez Ramos <shramos@protonmail.com>
#
# Type the following command to launch start the tests:
# $ test/run_tests -P "load_contrib('mqtt')" -t test/contrib/mqtt.uts

+ Syntax check
= Import the MQTT layer
from scapy.contrib.mqtt import *


+ MQTT protocol test

= MQTTPublish, packet instantiation
p = MQTT()/MQTTPublish(topic='test1',value='test2')
assert p.type == 3
assert p.topic == b'test1'
assert p.value == b'test2'
assert p.len == None
assert p.length == None

= Fixed header and MQTTPublish, packet dissection
s = b'0\n\x00\x04testtest'
publish = MQTT(s)
assert publish.type == 3
assert publish.QOS == 0
assert publish.DUP == 0
assert publish.RETAIN == 0
assert publish.len == 10
assert publish[MQTTPublish].length == 4
assert publish[MQTTPublish].topic == b'test'
assert publish[MQTTPublish].value == b'test'

= MQTTPublish

topicC = "testtopic/command"

p1 = MQTT(
            QOS=1
        ) / MQTTPublish(
            topic=topicC,
            msgid=1234,
            value="msg1"
        )
p2 = MQTT(
            QOS=1
        ) / MQTTPublish(
            topic=topicC,
            msgid=1235,
            value="msg2"
        )

p = MQTT(raw(p1 / p2))
assert p[1].msgid == 1234

= MQTTConnect, packet instantiation
c = MQTT()/MQTTConnect(clientIdlen=5, clientId='newid')
assert c.type == 1
assert c.clientId == b'newid'
assert c.clientIdlen == 5

= MQTTConnect, packet dissection
s = b'\x10\x1f\x00\x06MQIsdp\x03\x02\x00<\x00\x11mosqpub/1440-kali'
connect = MQTT(s)
assert connect.length == 6
assert connect.protoname == b'MQIsdp'
assert connect.protolevel == 3
assert connect.usernameflag == 0
assert connect.passwordflag == 0
assert connect.willretainflag == 0
assert connect.willQOSflag == 0
assert connect.willflag == 0
assert connect.cleansess == 1
assert connect.reserved == 0
assert connect.klive == 60
assert connect.clientIdlen == 17
assert connect.clientId == b'mosqpub/1440-kali'

= MQTTDisconnect
mr = raw(MQTT()/MQTTDisconnect())                                                                                           
dc= MQTT(mr)                                                                                                                
assert dc.type == 14 

=MQTTConnack, packet instantiation
ck = MQTT()/MQTTConnack(sessPresentFlag=1,retcode=0)
assert ck.type == 2
assert ck.sessPresentFlag == 1
assert ck.retcode == 0

= MQTTConnack, packet dissection
s = b' \x02\x00\x00'
connack = MQTT(s)
assert connack.sessPresentFlag == 0
assert connack.retcode == 0


= MQTTSubscribe, packet instantiation
sb = MQTT()/MQTTSubscribe(msgid=1, topics=[MQTTTopicQOS(topic='newtopic', QOS=1, length=0)])
assert sb.type == 8
assert sb.msgid == 1
assert sb.topics[0].topic == b'newtopic'
assert sb.topics[0].length == 0
assert sb[MQTTSubscribe][MQTTTopicQOS].QOS == 1

= MQTTSubscribe, packet dissection
s = b'\x82\t\x00\x01\x00\x04test\x01'
subscribe = MQTT(s)
assert subscribe.msgid == 1
assert subscribe.topics[0].length == 4
assert subscribe.topics[0].topic == b'test'
assert subscribe.topics[0].QOS == 1


= MQTTSuback, packet instantiation
sk = MQTT()/MQTTSuback(msgid=1, retcodes=[0])
assert sk.type == 9
assert sk.msgid == 1
assert sk.retcodes == [0]

= MQTTSuback, packet dissection
s = b'\x90\x03\x00\x01\x00'
suback = MQTT(s)
assert suback.msgid == 1
assert suback.retcodes == [0]

s = b'\x90\x03\x00\x01\x00\x01'
suback = MQTT(s)
assert suback.msgid == 1
assert suback.retcodes == [0, 1]

= MQTTUnsubscribe, packet instantiation
unsb = MQTT()/MQTTUnsubscribe(msgid=1, topics=[MQTTTopic(topic='newtopic',length=0)])
assert unsb.type == 10
assert unsb.msgid == 1
assert unsb.topics[0].topic == b'newtopic'
assert unsb.topics[0].length == 0

= MQTTUnsubscribe, packet dissection
u = b'\xA2\x09\x00\x01\x00\x03\x61\x2F\x62'
unsubscribe = MQTT(u)
assert unsubscribe.msgid == 1
assert unsubscribe.topics[0].length == 3
assert unsubscribe.topics[0].topic == b'a/b'

= MQTTUnsuback, packet instantiation
unsk = MQTT()/MQTTUnsuback(msgid=1)
assert unsk.type == 11
assert unsk.msgid == 1

= MQTTUnsuback, packet dissection
u = b'\xb0\x02\x00\x01'
unsuback = MQTT(u)
assert unsuback.type == 11
assert unsuback.msgid == 1

= MQTTPubrec, packet instantiation
pc = MQTT()/MQTTPubrec(msgid=1)
assert pc.type == 5
assert pc.msgid == 1

= MQTTPubrec packet dissection
s = b'P\x02\x00\x01'
pubrec = MQTT(s)
assert pubrec.msgid == 1

= MQTTPublish, long value
p = MQTT()/MQTTPublish(topic='test1',value='a'*200)
assert bytes(p)
assert p.type == 3
assert p.topic == b'test1'
assert p.value == b'a'*200
assert p.len == None
assert p.length == None

= MQTT without payload
p = MQTT()
assert bytes(p) == b'\x10\x00'

= MQTT RandVariableFieldLen
assert type(MQTT().fieldtype['len'].randval()) == RandVariableFieldLen
assert type(MQTT().fieldtype['len'].randval() + 0) == int

= MQTTUnsubscribe
u = MQTT(b'\xA2\x0C\x00\x01\x00\x03\x61\x2F\x62\x00\x03\x63\x2F\x64')
assert MQTTUnsubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d"

= MQTTSubscribe
u = MQTT(b'\x82\x10\x00\x01\x00\x03\x61\x2F\x62\x02\x00\x03\x63\x2F\x64\x00')
assert MQTTSubscribe in u and len(u.topics) == 2 and u.topics[1].topic == b"c/d"