File: linux.uts

package info (click to toggle)
scapy 2.6.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,956 kB
  • sloc: python: 163,618; sh: 90; makefile: 11
file content (416 lines) | stat: -rw-r--r-- 12,265 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
% Regression tests for Linux only

# More information at http://www.secdev.org/projects/UTscapy/


############
############

+ Linux only test

= L3RawSocket
~ netaccess IP TCP linux needs_root

with no_debug_dissector():
    x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3)

x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126

# TODO: fix this test (randomly stuck)
# ex: https://travis-ci.org/secdev/scapy/jobs/247473497

#= Supersocket _flush_fd
#~ needs_root linux
#
#import select
#
#from scapy.arch.linux import _flush_fd
#socket = conf.L2listen()
#select.select([socket],[],[],2)
#_flush_fd(socket.ins)

= Interface aliases & sub-interfaces
~ linux needs_root

import os
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status = os.system("ip link set scapy0 up")
exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up")
if exit_status == 0:
    exit_status = os.system("ip addr show scapy0")
    print(get_if_list())
    conf.route.resync()
    print(conf.route.routes)
    assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0")
    route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0)
    assert route_alias in conf.route.routes
    exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42")
    exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42")
    exit_status = os.system("ip link set scapy0.42 up")
    exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41")
    print(get_if_list())
    conf.route.resync()
    print(conf.route.routes)
    assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41")
    route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0)
    assert route_specific in conf.route.routes
    assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
    assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
    exit_status = os.system("ip link del name dev scapy0")
else:
    assert True


= Test scoped interface addresses
~ linux needs_root

import os
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip link add name scapy1 type dummy")
exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1")
exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up")
exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up")
assert exit_status == 0

conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()

conf.route6

try:
    # IPv4
    a = Ether()/IP(dst="224.0.0.1%scapy0")
    assert a[Ether].src == "00:01:02:03:04:05"
    assert a[IP].src == "192.0.2.1"
    b = Ether()/IP(dst="224.0.0.1%scapy1")
    assert b[Ether].src == "06:07:08:09:10:11"
    assert b[IP].src == "192.0.3.1"
    c = Ether()/IP(dst="224.0.0.1/24%scapy1")
    assert c[Ether].src == "06:07:08:09:10:11"
    assert c[IP].src == "192.0.3.1"
    # IPv6
    a = Ether()/IPv6(dst="ff02::fb%scapy0")
    assert a[Ether].src == "00:01:02:03:04:05"
    assert a[IPv6].src == "fe80::201:2ff:fe03:405"
    b = Ether()/IPv6(dst="ff02::fb%scapy1")
    assert b[Ether].src == "06:07:08:09:10:11"
    assert b[IPv6].src == "fe80::407:8ff:fe09:1011"
    c = Ether()/IPv6(dst="ff02::fb/30%scapy1")
    assert c[Ether].src == "06:07:08:09:10:11"
    assert c[IPv6].src == "fe80::407:8ff:fe09:1011"
finally:
    exit_status = os.system("ip link del scapy0")
    exit_status = os.system("ip link del scapy1")
    conf.ifaces.reload()
    conf.route.resync()
    conf.route6.resync()

= catch loopback device missing
~ linux needs_root

from unittest.mock import patch

# can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead

with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'):
    routes = read_routes()

= catch loopback device no address assigned
~ linux needs_root

import os, socket
from unittest.mock import patch

try:
    exit_status = os.system("ip link add name scapy_lo type dummy")
    assert exit_status == 0
    exit_status = os.system("ip link set dev scapy_lo up")
    assert exit_status == 0
    
    with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
        routes = read_routes()
    
    exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24")
    assert exit_status == 0
    
    with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
        routes = read_routes()
    
    lo_routes = [
        (ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric)
        for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes
        if if_name == "scapy_lo"
    ]
    lo_routes.sort(key=lambda x: x[0])
    
    expected_routes = [
        (168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
        (168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
        (168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
    ]
    print(lo_routes)
    print(expected_routes)
finally:
    exit_status = os.system("ip link del dev scapy_lo")
    assert exit_status == 0

= IPv6 link-local address selection

conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10')

from unittest.mock import patch
conf.route6.routes =  [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)]
conf.route6.ipv6_ifaces = set(['scapy0'])
bck_conf_iface = conf.iface
conf.iface = "scapy0"

p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1")
print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%"))
ip6_ll_address = 'fe80::e039:91ff:fe79:1910'
print(p[IPv6].src, ip6_ll_address)
assert p[IPv6].src == ip6_ll_address
mac_address = 'e2:39:91:79:19:10'
print(p[Ether].src, mac_address)
assert p[Ether].src == mac_address

conf.iface = bck_conf_iface
conf.route6.resync()

= IPv6 - check OS routes
~ linux ipv6

addrs = in6_getifaddr()
if addrs:
    assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address'
    ifaces6 = [addr[2] for addr in in6_getifaddr()]
    assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real'


= veth interface error handling
~ linux needs_root veth

from scapy.arch.linux import VEthPair

try:
    veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1')
    veth.setup()
    assert False
except subprocess.CalledProcessError:
    pass
except Exception:
    assert False

= veth interface usage - ctx manager
~ linux needs_root veth

from threading import Condition

cond_started = Condition()

def _sniffer_started():

    global cond_started
    cond_started.acquire()
    cond_started.notify()
    cond_started.release()

cond_started.acquire()

try:
    with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth:
        if_list = get_if_list()
        assert ('veth_scapy_0' in if_list)
        assert ('veth_scapy_1' in if_list)
        frm_count = 0
        def _sniffer():
            sniffed = sniff(iface='veth_scapy_1',
                            store=True,
                            count=2,
                            lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
                            started_callback=_sniffer_started,
                            timeout=3)
            global frm_count
            frm_count = 2
        t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1")
        t_sniffer.start()
        cond_started.wait()
        sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
              iface='veth_scapy_0',
              count=2)
        t_sniffer.join(1)
        assert frm_count == 2

    if_list = get_if_list()
    assert ('veth_scapy_0' not in if_list)
    assert ('veth_scapy_1' not in if_list)
except subprocess.CalledProcessError:
    assert False
except Exception:
    assert False

= veth interface usage - manual interface handling
~ linux needs_root veth

from threading import Condition

cond_started = Condition()

def _sniffer_started():

    global cond_started
    cond_started.acquire()
    cond_started.notify()
    cond_started.release()

cond_started.acquire()


veth = VEthPair('veth_scapy_0', 'veth_scapy_1')
try:
    veth.setup()
    veth.up()
except subprocess.CalledProcessError:
    assert False
except Exception:
    assert False

conf.ifaces.reload()
if_list = get_if_list()
assert ('veth_scapy_0' in if_list)
assert ('veth_scapy_1' in if_list)

frm_count = 0
def _sniffer():
    sniffed = sniff(iface='veth_scapy_1',
                    store=True,
                    count=2,
                    lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
                    started_callback=_sniffer_started,
                    timeout=3)
    global frm_count
    frm_count = 2

t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2")
t_sniffer.start()
cond_started.wait()
sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
      iface='veth_scapy_0',
      count=2)
t_sniffer.join(1)
assert frm_count == 2

try:
    veth.down()
    veth.destroy()

    conf.ifaces.reload()
    if_list = get_if_list()
    assert ('veth_scapy_0' not in if_list)
    assert ('veth_scapy_1' not in if_list)
except subprocess.CalledProcessError:
    assert False
except Exception:
    assert False


= Routing table, interface with no names
~ linux

from unittest.mock import patch

@patch("scapy.arch.linux.ioctl")
def test_read_routes(mock_ioctl):
    def raise_ioerror(*args, **kwargs):
        if args[1] == 0x8912:
            return args[2]
        raise IOError
    mock_ioctl.side_effect = raise_ioerror
    read_routes()

test_read_routes()


= L3PacketSocket sendto exception
~ linux needs_root

from scapy.arch.linux import L3PacketSocket

import socket
from unittest import mock

@mock.patch("scapy.arch.linux.socket.socket.sendto")
def test_L3PacketSocket_sendto_python3(mock_sendto):
    mock_sendto.side_effect = OSError(22, 2807)
    l3ps = L3PacketSocket()
    l3ps.send(IP(dst="8.8.8.8")/ICMP())
    return True

assert test_L3PacketSocket_sendto_python3()

= Test _interface_selection
~ netaccess linux needs_root

import os
from scapy.sendrecv import _interface_selection
assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False)
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status = os.system("ip addr add fc00::/24 dev scapy0")
exit_status = os.system("ip link set scapy0 up")
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False)
assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True)
exit_status = os.system("ip link del name dev scapy0")
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()

= Test 802.1Q sniffing
~ linux needs_root veth

from scapy.arch.linux import VEthPair
from threading import Thread, Condition

def _send():
    sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2)


with VEthPair("left0", "right0") as veth:
    exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42")
    exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42")
    exit_status = os.system("ip link set vlanright0 up")
    exit_status = os.system("ip link set vlanleft0 up")
    exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0")
    exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0")
    sniffer = AsyncSniffer(
        iface="right0",
        lfilter=lambda p: Dot1Q in p,
        count=2,
        timeout=5,
        started_callback=_send,
    )
    sniffer.start()
    sniffer.join(1)
    if sniffer.running:
        sniffer.stop()
        raise Scapy_Exception("Sniffer did not stop !")
    else:
        results = sniffer.results


assert len(results) == 2
assert all(Dot1Q in x for x in results)


= Reload interfaces & routes

conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()