File: canfdsocket_native.uts

package info (click to toggle)
scapy 2.6.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,956 kB
  • sloc: python: 163,618; sh: 90; makefile: 11
file content (152 lines) | stat: -rw-r--r-- 5,655 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
% Regression tests for nativecanfdsocket
~ not_pypy vcan_socket needs_root linux

# More information at http://www.secdev.org/projects/UTscapy/


############
############
+ Configuration of CAN virtual sockets
~ conf

= Load module
load_layer("can", globals_dict=globals())
conf.contribs['CANSocket'] = {'use-python-can': False}
from scapy.contrib.cansocket_native import *
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}


= Setup string for vcan
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"

= Load os
import os
import threading
from time import sleep
from subprocess import call

= Setup vcan0
assert 0 == os.system(bashCommand)

+ Basic Packet Tests()
= CAN FD Packet init
canfdframe = CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')
assert bytes(canfdframe) == b'\x00\x00\x07\xff\x08\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\xaa'

+ Basic Socket Tests()
= CAN FD Socket Init
sock1 = CANSocket(channel="vcan0", fd=True)

= CAN Socket send recv small packet without remove padding

conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': False}

sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock2.close()

rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') / Padding(b"\x00" * (64 - 12))
rx = sock1.recv()
# different Kernel Versions produce different packets
hexdump(rx)
test = CANFD(identifier=0x7ff, fd_flags=0, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
hexdump(test)
test2 = CANFD(identifier=0x7ff,fd_flags=4, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
hexdump(test2)
assert bytes(rx) in [bytes(test), bytes(test2)]

= CAN Socket send recv

conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}

sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.close()

rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=12, fd_flags=4, data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00')

= CAN Socket basecls test


sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.close()

sock1.basecls = Raw
rx = sock1.recv()
assert rx.load == bytes(CANFD(identifier=0x7ff, fd_flags=4, length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00' + b'\x00' * (64 - 12)))

= sniff with filtermask 0x1FFFFFFF and inverse filter


sock1 = CANSocket(channel='vcan0', fd=True, can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}])

sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.close()

packets = sock1.sniff(timeout=0.1, verbose=False, count=4)
assert len(packets) == 4

sock1.close()

+ bridge and sniff tests

= bridge and sniff setup vcan1 package forwarding


bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
assert 0 == os.system(bashCommand)

sock0 = CANSocket(channel='vcan0', fd=True)
sock1 = CANSocket(channel='vcan1', fd=True)

bridgeStarted = threading.Event()

def bridge():
    global bridgeStarted
    bSock0 = CANSocket(channel="vcan0", fd=True)
    bSock1 = CANSocket(channel='vcan1', fd=True)
    def pnr(pkt):
        return pkt
    bridgeStarted.set()
    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.2, verbose=False, count=6)
    bSock0.close()
    bSock1.close()

threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=5)
sock0.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))

packetsVCan1 = sock1.sniff(timeout=0.1, verbose=False, count=6)
assert len(packetsVCan1) == 6

threadBridge.join(timeout=5)
assert not threadBridge.is_alive()

sock1.close()
sock0.close()


= Delete vcan interfaces

if 0 != call(["sudo", "ip", "link", "delete", "vcan0"]):
        raise Exception("vcan0 could not be deleted")

if 0 != call(["sudo", "ip", "link", "delete", "vcan1"]):
        raise Exception("vcan1 could not be deleted")