File: tftp.uts

package info (click to toggle)
scapy 2.6.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 13,956 kB
  • sloc: python: 163,618; sh: 90; makefile: 11
file content (174 lines) | stat: -rw-r--r-- 5,757 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
% Regression tests for TFTP

# More information at http://www.secdev.org/projects/UTscapy/

+ TFTP coverage tests

= Test answers

assert TFTP_DATA(block=1).answers(TFTP_RRQ())
assert not TFTP_WRQ().answers(TFTP_RRQ())
assert not TFTP_RRQ().answers(TFTP_WRQ())
assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1))
assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1))
assert TFTP_ACK(block=0).answers(TFTP_RRQ())
assert not TFTP_ACK().answers(TFTP_ACK())
assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK())
assert TFTP_OACK().answers(TFTP_WRQ())

+ TFTP Automatons
~ linux

= Utilities
~ linux

from scapy.automaton import select_objects

class MockTFTPSocket(object):
    packets = []
    def recv(self, n=None):
        pkt = self.packets.pop(0)
        return pkt
    def send(self, *args, **kargs):
        pass
    def close(self):
        pass
    @classmethod
    def select(classname, inputs, remain):
        test = [s for s in inputs if isinstance(s, classname)]
        if test:
            if len(test[0].packets):
                return test
            else:
                inputs = [s for s in inputs if not isinstance(s, classname)]
        return select_objects(inputs, remain)


= TFTP_read() automaton
~ linux

class MockReadSocket(MockTFTPSocket):
    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
               IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]

tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
                      ll=MockReadSocket,
                      recvsock=MockReadSocket, debug=5)

res = tftp_read.run()
assert res == (b"P" * 512 + b"<3")

= TFTP_read() automaton error
~ linux

class MockReadSocket(MockTFTPSocket):
    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]

tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
                      ll=MockReadSocket,
                      recvsock=MockReadSocket)

try:
    tftp_read.run()
    assert False
except Automaton.ErrorState as e:
    assert "Reached ERROR" in str(e)
    assert "ERROR Access violation" in str(e)


= TFTP_write() automaton
~ linux

data_received = b""

class MockWriteSocket(MockTFTPSocket):
    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
               IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
    def send(self, *args, **kargs):
        if len(args) and Raw in args[0]:
            global data_received
            data_received += args[0][Raw].load

tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
                        ll=MockWriteSocket,
                        recvsock=MockWriteSocket)

tftp_write.run()
assert data_received == (b"P" * 767 + b"Scapy <3")

= TFTP_write() automaton error
~ linux

class MockWriteSocket(MockTFTPSocket):
    packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]

tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
                        ll=MockWriteSocket,
                        recvsock=MockWriteSocket)

try:
    tftp_write.run()
    assert False
except Automaton.ErrorState as e:
    assert "Reached ERROR" in str(e)
    assert "ERROR Access violation" in str(e)


= TFTP_WRQ_server() automaton
~ linux

class MockWRQSocket(MockTFTPSocket):
    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]

tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
                           ll=MockWRQSocket,
                           recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))

= TFTP_WRQ_server() automaton with options
~ linux

class MockWRQSocket(MockTFTPSocket):
    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]

tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
                           ll=MockWRQSocket,
                           recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3"))

= TFTP_RRQ_server() automaton
~ linux

sent_data = "P" * 512 + "<3"
import tempfile
filename = tempfile.mktemp(suffix=".txt")
fdesc = open(filename, "w")
fdesc.write(sent_data)
fdesc.close()

received_data = ""

class MockRRQSocket(MockTFTPSocket):
    packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
               IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
    def send(self, *args, **kargs):
        if len(args):
            pkt = args[0]
            if TFTP_DATA in pkt:
                global received_data
                received_data += pkt[Raw].load.decode("utf-8")

tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
                           ll=MockRRQSocket,
                           recvsock=MockRRQSocket)
tftp_rrq.run()
assert received_data == sent_data

import os
os.unlink(filename)