1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
% Regression tests for TFTP
# More information at http://www.secdev.org/projects/UTscapy/
+ TFTP coverage tests
= Test answers
assert TFTP_DATA(block=1).answers(TFTP_RRQ())
assert not TFTP_WRQ().answers(TFTP_RRQ())
assert not TFTP_RRQ().answers(TFTP_WRQ())
assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1))
assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1))
assert TFTP_ACK(block=0).answers(TFTP_RRQ())
assert not TFTP_ACK().answers(TFTP_ACK())
assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK())
assert TFTP_OACK().answers(TFTP_WRQ())
+ TFTP Automatons
~ linux
= Utilities
~ linux
from scapy.automaton import select_objects
class MockTFTPSocket(object):
packets = []
def recv(self, n=None):
pkt = self.packets.pop(0)
return pkt
def send(self, *args, **kargs):
pass
def close(self):
pass
@classmethod
def select(classname, inputs, remain):
test = [s for s in inputs if isinstance(s, classname)]
if test:
if len(test[0].packets):
return test
else:
inputs = [s for s in inputs if not isinstance(s, classname)]
return select_objects(inputs, remain)
= TFTP_read() automaton
~ linux
class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]
tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket, debug=5)
res = tftp_read.run()
assert res == (b"P" * 512 + b"<3")
= TFTP_read() automaton error
~ linux
class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket)
try:
tftp_read.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)
= TFTP_write() automaton
~ linux
data_received = b""
class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
def send(self, *args, **kargs):
if len(args) and Raw in args[0]:
global data_received
data_received += args[0][Raw].load
tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)
tftp_write.run()
assert data_received == (b"P" * 767 + b"Scapy <3")
= TFTP_write() automaton error
~ linux
class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)
try:
tftp_write.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)
= TFTP_WRQ_server() automaton
~ linux
class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))
= TFTP_WRQ_server() automaton with options
~ linux
class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3"))
= TFTP_RRQ_server() automaton
~ linux
sent_data = "P" * 512 + "<3"
import tempfile
filename = tempfile.mktemp(suffix=".txt")
fdesc = open(filename, "w")
fdesc.write(sent_data)
fdesc.close()
received_data = ""
class MockRRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
def send(self, *args, **kargs):
if len(args):
pkt = args[0]
if TFTP_DATA in pkt:
global received_data
received_data += pkt[Raw].load.decode("utf-8")
tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
ll=MockRRQSocket,
recvsock=MockRRQSocket)
tftp_rrq.run()
assert received_data == sent_data
import os
os.unlink(filename)
|