1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
# $Id: tcp.py 42 2007-08-02 22:38:47Z jon.oberheide $
# -*- coding: utf-8 -*-
"""Transmission Control Protocol."""
from __future__ import print_function
from __future__ import absolute_import
from . import dpkt
from .compat import compat_ord
# TCP control flags
TH_FIN = 0x01 # end of data
TH_SYN = 0x02 # synchronize sequence numbers
TH_RST = 0x04 # reset connection
TH_PUSH = 0x08 # push
TH_ACK = 0x10 # acknowledgment number set
TH_URG = 0x20 # urgent pointer set
TH_ECE = 0x40 # ECN echo, RFC 3168
TH_CWR = 0x80 # congestion window reduced
TH_NS = 0x100 # nonce sum, RFC 3540
TCP_PORT_MAX = 65535 # maximum port
TCP_WIN_MAX = 65535 # maximum (unscaled) window
def tcp_flags_to_str(val):
ff = []
if val & TH_FIN:
ff.append('FIN')
if val & TH_SYN:
ff.append('SYN')
if val & TH_RST:
ff.append('RST')
if val & TH_PUSH:
ff.append('PUSH')
if val & TH_ACK:
ff.append('ACK')
if val & TH_URG:
ff.append('URG')
if val & TH_ECE:
ff.append('ECE')
if val & TH_CWR:
ff.append('CWR')
if val & TH_NS:
ff.append('NS')
return ','.join(ff)
class TCP(dpkt.Packet):
"""Transmission Control Protocol.
The Transmission Control Protocol (TCP) is one of the main protocols of the Internet protocol suite.
It originated in the initial network implementation in which it complemented the Internet Protocol (IP).
Attributes:
sport - source port
dport - destination port
seq - sequence number
ack - acknowledgement number
off - data offset in 32-bit words
flags - TCP flags
win - TCP window size
sum - checksum
urp - urgent pointer
opts - TCP options buffer; call parse_opts() to parse
"""
__hdr__ = (
('sport', 'H', 0xdead),
('dport', 'H', 0),
('seq', 'I', 0xdeadbeef),
('ack', 'I', 0),
('_off_flags', 'H', ((5 << 12) | TH_SYN)),
('win', 'H', TCP_WIN_MAX),
('sum', 'H', 0),
('urp', 'H', 0)
)
__bit_fields__ = {
'_off_flags': (
('off', 4), # 4 hi bits
('_rsv', 3), # 3 bits reserved
('flags', 9), # 9 lo bits
)
}
__pprint_funcs__ = {
'flags': tcp_flags_to_str,
'sum': hex, # display checksum in hex
}
opts = b''
def __len__(self):
return self.__hdr_len__ + len(self.opts) + len(self.data)
def __bytes__(self):
return self.pack_hdr() + bytes(self.opts) + bytes(self.data)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
ol = ((self._off_flags >> 12) << 2) - self.__hdr_len__
if ol < 0:
raise dpkt.UnpackError('invalid header length')
self.opts = buf[self.__hdr_len__:self.__hdr_len__ + ol]
self.data = buf[self.__hdr_len__ + ol:]
# Options (opt_type) - http://www.iana.org/assignments/tcp-parameters
TCP_OPT_EOL = 0 # end of option list
TCP_OPT_NOP = 1 # no operation
TCP_OPT_MSS = 2 # maximum segment size
TCP_OPT_WSCALE = 3 # window scale factor, RFC 1072
TCP_OPT_SACKOK = 4 # SACK permitted, RFC 2018
TCP_OPT_SACK = 5 # SACK, RFC 2018
TCP_OPT_ECHO = 6 # echo (obsolete), RFC 1072
TCP_OPT_ECHOREPLY = 7 # echo reply (obsolete), RFC 1072
TCP_OPT_TIMESTAMP = 8 # timestamp, RFC 1323
TCP_OPT_POCONN = 9 # partial order conn, RFC 1693
TCP_OPT_POSVC = 10 # partial order service, RFC 1693
TCP_OPT_CC = 11 # connection count, RFC 1644
TCP_OPT_CCNEW = 12 # CC.NEW, RFC 1644
TCP_OPT_CCECHO = 13 # CC.ECHO, RFC 1644
TCP_OPT_ALTSUM = 14 # alt checksum request, RFC 1146
TCP_OPT_ALTSUMDATA = 15 # alt checksum data, RFC 1146
TCP_OPT_SKEETER = 16 # Skeeter
TCP_OPT_BUBBA = 17 # Bubba
TCP_OPT_TRAILSUM = 18 # trailer checksum
TCP_OPT_MD5 = 19 # MD5 signature, RFC 2385
TCP_OPT_SCPS = 20 # SCPS capabilities
TCP_OPT_SNACK = 21 # selective negative acks
TCP_OPT_REC = 22 # record boundaries
TCP_OPT_CORRUPT = 23 # corruption experienced
TCP_OPT_SNAP = 24 # SNAP
TCP_OPT_TCPCOMP = 26 # TCP compression filter
TCP_OPT_MAX = 27
def parse_opts(buf):
"""Parse TCP option buffer into a list of (option, data) tuples."""
opts = []
while buf:
o = compat_ord(buf[0])
if o > TCP_OPT_NOP:
try:
# advance buffer at least 2 bytes = 1 type + 1 length
l_ = max(2, compat_ord(buf[1]))
d, buf = buf[2:l_], buf[l_:]
except (IndexError, ValueError):
# print 'bad option', repr(str(buf))
opts.append(None) # XXX
break
else:
# options 0 and 1 are not followed by length byte
d, buf = b'', buf[1:]
opts.append((o, d))
return opts
def test_parse_opts():
# normal scenarios
buf = b'\x02\x04\x23\x00\x01\x01\x04\x02'
opts = parse_opts(buf)
assert opts == [
(TCP_OPT_MSS, b'\x23\x00'),
(TCP_OPT_NOP, b''),
(TCP_OPT_NOP, b''),
(TCP_OPT_SACKOK, b'')
]
buf = b'\x01\x01\x05\x0a\x37\xf8\x19\x70\x37\xf8\x29\x78'
opts = parse_opts(buf)
assert opts == [
(TCP_OPT_NOP, b''),
(TCP_OPT_NOP, b''),
(TCP_OPT_SACK, b'\x37\xf8\x19\x70\x37\xf8\x29\x78')
]
# test a zero-length option
buf = b'\x02\x00\x01'
opts = parse_opts(buf)
assert opts == [
(TCP_OPT_MSS, b''),
(TCP_OPT_NOP, b'')
]
# test a one-byte malformed option
buf = b'\xff'
opts = parse_opts(buf)
assert opts == [None]
def test_offset():
tcpheader = TCP(b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x50\x18\x40\x00\x56\xf8\x00\x00')
assert tcpheader.off == 5
# test setting header offset
tcpheader.off = 8
assert bytes(tcpheader) == b'\x01\xbb\xc0\xd7\xb6\x56\xa8\xb9\xd1\xac\xaa\xb1\x80\x18\x40\x00\x56\xf8\x00\x00'
def test_tcp_flags_to_str():
assert tcp_flags_to_str(0x18) == 'PUSH,ACK'
assert tcp_flags_to_str(0x12) == 'SYN,ACK'
# for code coverage
assert tcp_flags_to_str(0x1ff) == 'FIN,SYN,RST,PUSH,ACK,URG,ECE,CWR,NS'
def test_tcp_unpack():
data = (b'\x00\x50\x0d\x2c\x11\x4c\x61\x8b\x38\xaf\xfe\x14\x70\x12\x16\xd0'
b'\x5b\xdc\x00\x00\x02\x04\x05\x64\x01\x01\x04\x02')
tcp = TCP(data)
assert tcp.flags == (TH_SYN | TH_ACK)
assert tcp.off == 7
assert tcp.win == 5840
assert tcp.dport == 3372
assert tcp.seq == 290218379
assert tcp.ack == 951057940
def test_tcp_pack():
tcp = TCP(
sport=3372,
dport=80,
seq=951057939,
ack=0,
off=7,
flags=TH_SYN,
win=8760,
sum=0xc30c,
urp=0,
opts=b'\x02\x04\x05\xb4\x01\x01\x04\x02'
)
assert bytes(tcp) == (
b'\x0d\x2c\x00\x50\x38\xaf\xfe\x13\x00\x00\x00\x00\x70\x02\x22\x38'
b'\xc3\x0c\x00\x00\x02\x04\x05\xb4\x01\x01\x04\x02')
# TODO: add checksum calculation
|