File: canfdsocket_python_can.uts

package info (click to toggle)
scapy 2.6.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,956 kB
  • sloc: python: 163,618; sh: 90; makefile: 11
file content (177 lines) | stat: -rw-r--r-- 6,023 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
% Regression tests for the CANSocket
~ vcan_socket linux needs_root not_pypy

# More information at http://www.secdev.org/projects/UTscapy/


############
############
+ Configuration of CAN virtual sockets

= Load module
~ conf
 
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
load_layer("can", globals_dict=globals())
conf.contribs['CANSocket'] = {'use-python-can': True}
from scapy.contrib.cansocket_python_can import *

= Setup string for vcan
~ conf command
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"

= Load os
~ conf command

import os
import threading
from subprocess import call

= Setup vcan0
~ conf command

0 == os.system(bashCommand)

= Define common used functions

send_done = threading.Event()

def sender(sock, msg):
    if not hasattr(msg, "__iter__"):
        msg = [msg]
    for m in msg:
        sock.send(m)
    send_done.set()

+ Basic Packet Tests()
= CAN Packet init

canframe = CANFD(identifier=0x7ff,length=10,data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')
bytes(canframe) == b'\x00\x00\x07\xff\x0c\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08ab\x00\x00'

+ Basic Socket Tests()
= CAN Socket Init

sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock1.close()
del sock1
sock1 = None
assert sock1 == None

= CAN Socket send recv small packet

sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock2 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)

sock2.send(CANFD(identifier=0x7ff,length=10,data=b'\x01'*10))
sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
rx1 = sock1.recv()
rx2 = sock1.recv()
sock1.close()
sock2.close()

assert rx1 == CANFD(identifier=0x7ff,length=10,data=b'\x01'*10)
assert rx2 == CAN(identifier=0x7ff,length=1,data=b'\x01')


= CAN Socket send recv small packet test with

with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
    sock2.send(CANFD(identifier=0x7ff,length=1,data=b'\x01'))
    rx = sock1.recv()

assert rx == CANFD(identifier=0x7ff,length=1,data=b'\x01')

= CAN Socket basecls test

with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
    sock1.basecls = Raw
    sock2.send(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
    rx = sock1.recv()
    assert rx == Raw(bytes(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')))

= CAN Socket send recv swapped

conf.contribs['CAN']['swap-bytes'] = True

with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
    CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
    sock2.send(CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64))
    sock1.basecls = CAN
    rx = sock1.recv()
    assert rx == CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64)

conf.contribs['CAN']['swap-bytes'] = False

= sniff with filtermask 0x7ff

msgs = [CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
        CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
        CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
        CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
        CANFD(identifier=0x100, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
        CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)]

with CANSocket(bustype='socketcan', channel='vcan0', fd=True, can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
        CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
    for m in msgs:
        sock2.send(m)
    packets = sock1.sniff(timeout=0.1, count=3)
    assert len(packets) == 3


+ bridge and sniff tests
= bridge and sniff setup vcan1 package forwarding

bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
assert 0 == os.system(bashCommand)

sock0 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock1 = CANSocket(bustype='socketcan', channel='vcan1', fd=True)

bridgeStarted = threading.Event()
def bridge():
    global bridgeStarted
    bSock0 = CANSocket(
        bustype='socketcan', channel='vcan0', bitrate=250000, fd=True)
    bSock1 = CANSocket(
        bustype='socketcan', channel='vcan1', bitrate=250000, fd=True)
    def pnr(pkt):
        return pkt
    bSock0.timeout = 0.01
    bSock1.timeout = 0.01
    bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
    bSock0.close()
    bSock1.close()

threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=1)

sock0.send(CANFD(flags='extended', identifier=0x10010000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10020000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10030000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10040000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))

packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan1) == 6

sock1.close()
sock0.close()

threadBridge.join(timeout=3)
assert not threadBridge.is_alive()


= Delete vcan interfaces
~ needs_root linux vcan_socket

if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]):
        raise Exception("vcan0 could not be deleted")

if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]):
        raise Exception("vcan1 could not be deleted")