1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
#!/usr/bin/env python3
#
# Wireshark - Network traffic analyzer
# By Gerald Combs <gerald@wireshark.org>
# Copyright 1998 Gerald Combs
#
# SPDX-License-Identifier: GPL-2.0-or-later
'''Update the IANA IP registry file.
Make-iana-ip creates a file containing information about IPv4/IPv6 allocation blocks.
'''
import csv
import io
import ipaddress
import os
import re
import sys
import urllib.request, urllib.error, urllib.parse
def exit_msg(msg=None, status=1):
if msg is not None:
sys.stderr.write(msg + '\n\n')
sys.stderr.write(__doc__ + '\n')
sys.exit(status)
def open_url(url):
'''Open a URL.
Returns a tuple containing the body and response dict. The body is a
str in Python 3 and bytes in Python 2 in order to be compatible with
csv.reader.
'''
if len(sys.argv) > 1:
url_path = os.path.join(sys.argv[1], url[1])
url_fd = open(url_path)
body = url_fd.read()
url_fd.close()
else:
url_path = '/'.join(url)
req_headers = { 'User-Agent': 'Wireshark iana-ip' }
try:
req = urllib.request.Request(url_path, headers=req_headers)
response = urllib.request.urlopen(req)
body = response.read().decode('UTF-8', 'replace')
except Exception:
exit_msg('Error opening ' + url_path)
return body
class IPv4SpecialBlock(ipaddress.IPv4Network):
@staticmethod
def ip_get_subnet_mask(bits):
masks = (
0x00000000,
0x80000000, 0xc0000000, 0xe0000000, 0xf0000000,
0xf8000000, 0xfc000000, 0xfe000000, 0xff000000,
0xff800000, 0xffc00000, 0xffe00000, 0xfff00000,
0xfff80000, 0xfffc0000, 0xfffe0000, 0xffff0000,
0xffff8000, 0xffffc000, 0xffffe000, 0xfffff000,
0xfffff800, 0xfffffc00, 0xfffffe00, 0xffffff00,
0xffffff80, 0xffffffc0, 0xffffffe0, 0xfffffff0,
0xfffffff8, 0xfffffffc, 0xfffffffe, 0xffffffff)
if bits > 32:
ValueError("Expected bit mask less or equal to 32")
return masks[bits]
def __str__(self):
addr = self.network_address
mask = self.prefixlen
line = '{{ .ipv4 = {{ {:#x}, {:#010x} }} }}'.format(addr, self.ip_get_subnet_mask(mask))
return line
class IPv6SpecialBlock(ipaddress.IPv6Network):
@staticmethod
def addr_c_array(byte_array):
if len(byte_array) != 16:
raise ValueError("Expected byte array of length 16")
c_array = ", ".join(f"0x{byte:02x}" for byte in byte_array)
return f"{{ {c_array} }}"
def __str__(self):
addr = self.network_address.packed
mask = self.prefixlen
line = '{{ .ipv6 = {{ {}, {} }} }}'.format(self.addr_c_array(addr), mask)
return line
class IPRegistry(list):
@staticmethod
def true_or_false(val):
if val == 'True':
return '1'
elif val == 'False':
return '0'
else:
return '-1'
def append(self, row):
ip, name, _, _, termin_date, source, destination, forward, glob, reserved = row
if termin_date[0].isdigit():
# skip allocations that have expired
return
name = re.sub(r'\[.*\]', '', name)
name = '"' + name.replace('"', '\\"') + '"'
source = self.true_or_false(source)
destination = self.true_or_false(destination)
forward = self.true_or_false(forward)
glob = self.true_or_false(glob)
reserved = self.true_or_false(reserved)
super().append([ip, name, source, destination, forward, glob, reserved])
class IPv4Registry(IPRegistry):
@staticmethod
def ipv4_addr_and_mask(s):
ip = IPv4SpecialBlock(s)
return ip
def append(self, row):
# some lines contain multiple (comma separated) blocks
ip_list = row[0].split(',')
for s in ip_list:
# remove annotations like "1.1.1.1 [2]"
ip_str = s.split()[0]
row = [self.ipv4_addr_and_mask(ip_str)] + row[1:]
super().append(row)
def fill(self):
self.sort()
fill_data = '_U_ static const struct ws_iana_ip_special_block __ipv4_special_block[] = {\n'
for row in self:
line = ' {{ 4, {}, {}, {}, {}, {}, {}, {} }},\n'.format(*row)
fill_data += line
fill_data += '};\n'
return fill_data
class IPv6Registry(IPRegistry):
@staticmethod
def ipv6_addr_and_mask(s):
ip_str = s.split()[0]
ip = IPv6SpecialBlock(ip_str)
return ip
def append(self, row):
# remove annotations like "1.1.1.1 [2]"
ip_str = row[0].split()[0]
row = [self.ipv6_addr_and_mask(ip_str)] + row[1:]
super().append(row)
def fill(self):
self.sort()
fill_data = '// GCC bug?\n'
fill_data += 'DIAG_OFF(missing-braces)\n'
fill_data += '_U_ static const struct ws_iana_ip_special_block __ipv6_special_block[] = {\n'
for row in self:
line = \
''' {{ 6, {},
{}, {}, {}, {}, {}, {} }},\n'''.format(*row)
fill_data += line
fill_data += '};\n'
fill_data += 'DIAG_ON(missing-braces)\n'
return fill_data
IANA_URLS = {
'IPv4': { 'url': ["https://www.iana.org/assignments/iana-ipv4-special-registry", "iana-ipv4-special-registry-1.csv"], 'min_entries': 2 },
'IPv6': { 'url': ["https://www.iana.org/assignments/iana-ipv6-special-registry", "iana-ipv6-special-registry-1.csv"], 'min_entries': 2 },
}
def dump_registry(db, reg):
db_url = IANA_URLS[db]['url']
print('Loading {} data from {}'.format(db, db_url))
body = open_url(db_url)
iana_csv = csv.reader(body.splitlines())
# Pop the title row.
next(iana_csv)
for iana_row in iana_csv:
# Address Block,Name,RFC,Allocation Date,Termination Date,Source,Destination,Forwardable,Globally Reachable,Reserved-by-Protocol
# ::1/128,Loopback Address,[RFC4291],2006-02,N/A,False,False,False,False,True
reg.append(iana_row)
if len(reg) < IANA_URLS[db]['min_entries']:
exit_msg("Too few {} entries. Got {}, wanted {}".format(db, len(reg), IANA_URLS[db]['min_entries']))
return reg.fill()
def main():
iana_path = os.path.join(os.path.dirname(__file__), '..', 'epan', 'iana-ip-data.c')
iana_data = '''\
/*
* This file was generated by running ./tools/make-iana-ip.py.
*
* SPDX-License-Identifier: GPL-2.0-or-later
*/
#include "iana-ip.h"
'''
iana_data += dump_registry('IPv4', IPv4Registry())
iana_data += '\n'
iana_data += dump_registry('IPv6', IPv6Registry())
try:
with io.open(iana_path, 'w', encoding='UTF-8') as iana_f:
iana_f.write(iana_data)
except Exception:
exit_msg("Couldn't open \"{}\" file for writing".format(iana_path))
if __name__ == '__main__':
main()
|