1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
% Regression tests for nativecanfdsocket
~ not_pypy vcan_socket needs_root linux
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Configuration of CAN virtual sockets
~ conf
= Load module
load_layer("can", globals_dict=globals())
conf.contribs['CANSocket'] = {'use-python-can': False}
from scapy.contrib.cansocket_native import *
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
= Setup string for vcan
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
= Load os
import os
import threading
from time import sleep
from subprocess import call
= Setup vcan0
assert 0 == os.system(bashCommand)
+ Basic Packet Tests()
= CAN FD Packet init
canfdframe = CANFD(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa')
assert bytes(canfdframe) == b'\x00\x00\x07\xff\x08\x04\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\xaa'
+ Basic Socket Tests()
= CAN FD Socket Init
sock1 = CANSocket(channel="vcan0", fd=True)
= CAN Socket send recv small packet without remove padding
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': False}
sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock2.close()
rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00') / Padding(b"\x00" * (64 - 12))
rx = sock1.recv()
# different Kernel Versions produce different packets
hexdump(rx)
test = CANFD(identifier=0x7ff, fd_flags=0, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
hexdump(test)
test2 = CANFD(identifier=0x7ff,fd_flags=4, length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') / Padding(b"\x00" * (64 - 8))
hexdump(test2)
assert bytes(rx) in [bytes(test), bytes(test2)]
= CAN Socket send recv
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.close()
rx = sock1.recv()
assert rx == CANFD(identifier=0x7ff,length=12, fd_flags=4, data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00')
= CAN Socket basecls test
sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(identifier=0x7ff,length=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa'))
sock2.close()
sock1.basecls = Raw
rx = sock1.recv()
assert rx.load == bytes(CANFD(identifier=0x7ff, fd_flags=4, length=12,data=b'\x01\x02\x03\x04\x05\x06\x07\x08\xaa\x00\x00\x00' + b'\x00' * (64 - 12)))
= sniff with filtermask 0x1FFFFFFF and inverse filter
sock1 = CANSocket(channel='vcan0', fd=True, can_filters=[{'can_id': 0x10000000 | CAN_INV_FILTER, 'can_mask': 0x1fffffff}])
sock2 = CANSocket(channel="vcan0", fd=True)
sock2.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x08ab'))
sock2.close()
packets = sock1.sniff(timeout=0.1, verbose=False, count=4)
assert len(packets) == 4
sock1.close()
+ bridge and sniff tests
= bridge and sniff setup vcan1 package forwarding
bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
assert 0 == os.system(bashCommand)
sock0 = CANSocket(channel='vcan0', fd=True)
sock1 = CANSocket(channel='vcan1', fd=True)
bridgeStarted = threading.Event()
def bridge():
global bridgeStarted
bSock0 = CANSocket(channel="vcan0", fd=True)
bSock1 = CANSocket(channel='vcan1', fd=True)
def pnr(pkt):
return pkt
bridgeStarted.set()
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.2, verbose=False, count=6)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=5)
sock0.send(CANFD(flags='extended', identifier=0x10010000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10020000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10030000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10040000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
sock0.send(CANFD(flags='extended', identifier=0x10000000, length=10, data=b'\x01\x02\x03\x04\x05\x06\x07\x0842'))
packetsVCan1 = sock1.sniff(timeout=0.1, verbose=False, count=6)
assert len(packetsVCan1) == 6
threadBridge.join(timeout=5)
assert not threadBridge.is_alive()
sock1.close()
sock0.close()
= Delete vcan interfaces
if 0 != call(["sudo", "ip", "link", "delete", "vcan0"]):
raise Exception("vcan0 could not be deleted")
if 0 != call(["sudo", "ip", "link", "delete", "vcan1"]):
raise Exception("vcan1 could not be deleted")
|