File: tcp.py

package info (click to toggle)
python-dpkt 1.9.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,104 kB
  • sloc: python: 14,911; makefile: 23
file content (234 lines) | stat: -rw-r--r-- 6,969 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# $Id: tcp.py 42 2007-08-02 22:38:47Z jon.oberheide $
# -*- coding: utf-8 -*-
"""Transmission Control Protocol."""
from __future__ import print_function
from __future__ import absolute_import

from . import dpkt
from .compat import compat_ord

# TCP control flags
TH_FIN = 0x01  # end of data
TH_SYN = 0x02  # synchronize sequence numbers
TH_RST = 0x04  # reset connection
TH_PUSH = 0x08  # push
TH_ACK = 0x10  # acknowledgment number set
TH_URG = 0x20  # urgent pointer set
TH_ECE = 0x40  # ECN echo, RFC 3168
TH_CWR = 0x80  # congestion window reduced
TH_NS = 0x100  # nonce sum, RFC 3540

TCP_PORT_MAX = 65535  # maximum port
TCP_WIN_MAX = 65535  # maximum (unscaled) window


def tcp_flags_to_str(val):
    ff = []
    if val & TH_FIN:
        ff.append('FIN')
    if val & TH_SYN:
        ff.append('SYN')
    if val & TH_RST:
        ff.append('RST')
    if val & TH_PUSH:
        ff.append('PUSH')
    if val & TH_ACK:
        ff.append('ACK')
    if val & TH_URG:
        ff.append('URG')
    if val & TH_ECE:
        ff.append('ECE')
    if val & TH_CWR:
        ff.append('CWR')
    if val & TH_NS:
        ff.append('NS')
    return ','.join(ff)


class TCP(dpkt.Packet):
    """Transmission Control Protocol.

    The Transmission Control Protocol (TCP) is one of the main protocols of the Internet protocol suite.
    It originated in the initial network implementation in which it complemented the Internet Protocol (IP).

    Attributes:
        sport - source port
        dport - destination port
        seq   - sequence number
        ack   - acknowledgement number
        off   - data offset in 32-bit words
        flags - TCP flags
        win   - TCP window size
        sum   - checksum
        urp   - urgent pointer
        opts  - TCP options buffer; call parse_opts() to parse
    """

    __hdr__ = (
        ('sport', 'H', 0xdead),
        ('dport', 'H', 0),
        ('seq', 'I', 0xdeadbeef),
        ('ack', 'I', 0),
        ('_off_flags', 'H', ((5 << 12) | TH_SYN)),
        ('win', 'H', TCP_WIN_MAX),
        ('sum', 'H', 0),
        ('urp', 'H', 0)
    )
    __bit_fields__ = {
        '_off_flags': (
            ('off', 4),    # 4 hi bits
            ('_rsv', 3),   # 3 bits reserved
            ('flags', 9),  # 9 lo bits
        )
    }
    __pprint_funcs__ = {
        'flags': tcp_flags_to_str,
        'sum': hex,  # display checksum in hex
    }
    opts = b''

    def __len__(self):
        return self.__hdr_len__ + len(self.opts) + len(self.data)

    def __bytes__(self):
        return self.pack_hdr() + bytes(self.opts) + bytes(self.data)

    def unpack(self, buf):
        dpkt.Packet.unpack(self, buf)
        ol = ((self._off_flags >> 12) << 2) - self.__hdr_len__
        if ol < 0:
            raise dpkt.UnpackError('invalid header length')
        self.opts = buf[self.__hdr_len__:self.__hdr_len__ + ol]
        self.data = buf[self.__hdr_len__ + ol:]


# Options (opt_type) - http://www.iana.org/assignments/tcp-parameters
TCP_OPT_EOL = 0  # end of option list
TCP_OPT_NOP = 1  # no operation
TCP_OPT_MSS = 2  # maximum segment size
TCP_OPT_WSCALE = 3  # window scale factor, RFC 1072
TCP_OPT_SACKOK = 4  # SACK permitted, RFC 2018
TCP_OPT_SACK = 5  # SACK, RFC 2018
TCP_OPT_ECHO = 6  # echo (obsolete), RFC 1072
TCP_OPT_ECHOREPLY = 7  # echo reply (obsolete), RFC 1072
TCP_OPT_TIMESTAMP = 8  # timestamp, RFC 1323
TCP_OPT_POCONN = 9  # partial order conn, RFC 1693
TCP_OPT_POSVC = 10  # partial order service, RFC 1693
TCP_OPT_CC = 11  # connection count, RFC 1644
TCP_OPT_CCNEW = 12  # CC.NEW, RFC 1644
TCP_OPT_CCECHO = 13  # CC.ECHO, RFC 1644
TCP_OPT_ALTSUM = 14  # alt checksum request, RFC 1146
TCP_OPT_ALTSUMDATA = 15  # alt checksum data, RFC 1146
TCP_OPT_SKEETER = 16  # Skeeter
TCP_OPT_BUBBA = 17  # Bubba
TCP_OPT_TRAILSUM = 18  # trailer checksum
TCP_OPT_MD5 = 19  # MD5 signature, RFC 2385
TCP_OPT_SCPS = 20  # SCPS capabilities
TCP_OPT_SNACK = 21  # selective negative acks
TCP_OPT_REC = 22  # record boundaries
TCP_OPT_CORRUPT = 23  # corruption experienced
TCP_OPT_SNAP = 24  # SNAP
TCP_OPT_TCPCOMP = 26  # TCP compression filter
TCP_OPT_MAX = 27


def parse_opts(buf):
    """Parse TCP option buffer into a list of (option, data) tuples."""
    opts = []
    while buf:
        o = compat_ord(buf[0])
        if o > TCP_OPT_NOP:
            try:
                # advance buffer at least 2 bytes = 1 type + 1 length
                l_ = max(2, compat_ord(buf[1]))
                d, buf = buf[2:l_], buf[l_:]
            except (IndexError, ValueError):
                # print 'bad option', repr(str(buf))
                opts.append(None)  # XXX
                break
        else:
            # options 0 and 1 are not followed by length byte
            d, buf = b'', buf[1:]
        opts.append((o, d))
    return opts


def test_parse_opts():
    # normal scenarios
    buf = b'\x02\x04\x23\x00\x01\x01\x04\x02'
    opts = parse_opts(buf)
    assert opts == [
        (TCP_OPT_MSS, b'\x23\x00'),
        (TCP_OPT_NOP, b''),
        (TCP_OPT_NOP, b''),
        (TCP_OPT_SACKOK, b'')
    ]

    buf = b'\x01\x01\x05\x0a\x37\xf8\x19\x70\x37\xf8\x29\x78'
    opts = parse_opts(buf)
    assert opts == [
        (TCP_OPT_NOP, b''),
        (TCP_OPT_NOP, b''),
        (TCP_OPT_SACK, b'\x37\xf8\x19\x70\x37\xf8\x29\x78')
    ]

    # test a zero-length option
    buf = b'\x02\x00\x01'
    opts = parse_opts(buf)
    assert opts == [
        (TCP_OPT_MSS, b''),
        (TCP_OPT_NOP, b'')
    ]

    # test a one-byte malformed option
    buf = b'\xff'
    opts = parse_opts(buf)
    assert opts == [None]


def test_offset():
    tcpheader = TCP(b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x50\x18\x40\x00\x56\xf8\x00\x00')
    assert tcpheader.off == 5

    # test setting header offset
    tcpheader.off = 8
    assert bytes(tcpheader) == b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x80\x18\x40\x00\x56\xf8\x00\x00'


def test_tcp_flags_to_str():
    assert tcp_flags_to_str(0x18) == 'PUSH,ACK'
    assert tcp_flags_to_str(0x12) == 'SYN,ACK'
    # for code coverage
    assert tcp_flags_to_str(0x1ff) == 'FIN,SYN,RST,PUSH,ACK,URG,ECE,CWR,NS'


def test_tcp_unpack():
    data = (b'\x00\x50\x0d\x2c\x11\x4c\x61\x8b\x38\xaf\xfe\x14\x70\x12\x16\xd0'
            b'\x5b\xdc\x00\x00\x02\x04\x05\x64\x01\x01\x04\x02')
    tcp = TCP(data)
    assert tcp.flags == (TH_SYN | TH_ACK)
    assert tcp.off == 7
    assert tcp.win == 5840
    assert tcp.dport == 3372
    assert tcp.seq == 290218379
    assert tcp.ack == 951057940


def test_tcp_pack():
    tcp = TCP(
        sport=3372,
        dport=80,
        seq=951057939,
        ack=0,
        off=7,
        flags=TH_SYN,
        win=8760,
        sum=0xc30c,
        urp=0,
        opts=b'\x02\x04\x05\xb4\x01\x01\x04\x02'
    )
    assert bytes(tcp) == (
        b'\x0d\x2c\x00\x50\x38\xaf\xfe\x13\x00\x00\x00\x00\x70\x02\x22\x38'
        b'\xc3\x0c\x00\x00\x02\x04\x05\xb4\x01\x01\x04\x02')

    # TODO: add checksum calculation