1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
% Regression tests for the CANSocket
~ vcan_socket linux needs_root not_pypy
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Configuration of CAN virtual sockets
= Load module
~ conf
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
load_layer("can", globals_dict=globals())
conf.contribs['CANSocket'] = {'use-python-can': True}
from scapy.contrib.cansocket_python_can import *
= Setup string for vcan
~ conf command
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
= Load os
~ conf command
import os
import threading
from subprocess import call
= Setup vcan0
~ conf command
0 == os.system(bashCommand)
= Define common used functions
send_done = threading.Event()
def sender(sock, msg):
if not hasattr(msg, "__iter__"):
msg = [msg]
for m in msg:
sock.send(m)
send_done.set()
+ Basic Packet Tests()
= CAN Packet init
canframe = CANFD(identifier=0x7ff,length=10,data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab')
bytes(canframe) == b'\x00\x00\x07\xff\x0c\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08ab\x00\x00'
+ Basic Socket Tests()
= CAN Socket Init
sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock1.close()
del sock1
sock1 = None
assert sock1 == None
= CAN Socket send recv small packet
sock1 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock2 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock2.send(CANFD(identifier=0x7ff,length=10,data=b'\x01'*10))
sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
rx1 = sock1.recv()
rx2 = sock1.recv()
sock1.close()
sock2.close()
assert rx1 == CANFD(identifier=0x7ff,length=10,data=b'\x01'*10)
assert rx2 == CAN(identifier=0x7ff,length=1,data=b'\x01')
= CAN Socket send recv small packet test with
with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
sock2.send(CANFD(identifier=0x7ff,length=1,data=b'\x01'))
rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=1,data=b'\x01')
= CAN Socket basecls test
with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
sock1.basecls = Raw
sock2.send(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
rx = sock1.recv()
assert rx == Raw(bytes(CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')))
= CAN Socket send recv swapped
conf.contribs['CAN']['swap-bytes'] = True
with CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
sock2.send(CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64))
sock1.basecls = CAN
rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=64,data=b'\x01' * 64)
conf.contribs['CAN']['swap-bytes'] = False
= sniff with filtermask 0x7ff
msgs = [CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
CANFD(identifier=0x300, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
CANFD(identifier=0x100, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8),
CANFD(identifier=0x200, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8)]
with CANSocket(bustype='socketcan', channel='vcan0', fd=True, can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0', fd=True) as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=3)
assert len(packets) == 3
+ bridge and sniff tests
= bridge and sniff setup vcan1 package forwarding
bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
assert 0 == os.system(bashCommand)
sock0 = CANSocket(bustype='socketcan', channel='vcan0', fd=True)
sock1 = CANSocket(bustype='socketcan', channel='vcan1', fd=True)
bridgeStarted = threading.Event()
def bridge():
global bridgeStarted
bSock0 = CANSocket(
bustype='socketcan', channel='vcan0', bitrate=250000, fd=True)
bSock1 = CANSocket(
bustype='socketcan', channel='vcan1', bitrate=250000, fd=True)
def pnr(pkt):
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CANFD(flags='extended', identifier=0x10010000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10020000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10030000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10040000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=64, data=b'\x01\x02\x03\x04\x05\x06\x07\x08' * 8))
packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan1) == 6
sock1.close()
sock0.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
= Delete vcan interfaces
~ needs_root linux vcan_socket
if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]):
raise Exception("vcan0 could not be deleted")
if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]):
raise Exception("vcan1 could not be deleted")
|