1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
|
# Written by John Hoffman
# Modified by Cameron Dale
# see LICENSE.txt for license information
#
# $Id: iprangeparse.py 266 2007-08-18 02:06:35Z camrdale-guest $
"""Deal with all types of IP addresses and IP address ranges.
@type logger: C{logging.Logger}
@var logger: the logger to send all log messages to for this module
@type ipv4addrmask: C{long}
@var ipv4addrmask: the address mask used to determine if the IP address
in C{long} format is v4 encapsulated in a v6 address
"""
from bisect import bisect, insort
import logging
logger = logging.getLogger('DebTorrent.iprangeparse')
def to_long_ipv4(ip):
"""Convert an IP address from a string to a long.
@type ip: C{string}
@param ip: the IP address
@rtype: C{long}
@return: the same IP address
@raises ValueError: if the input IP address is poorly formatted
"""
ip = ip.split('.')
if len(ip) != 4:
raise ValueError, "bad address"
b = 0L
for n in ip:
b *= 256
b += int(n)
return b
def to_long_ipv6(ip):
"""Convert an IPv6 address from a string to a long.
@type ip: C{string}
@param ip: the IPv6 address
@rtype: C{long}
@return: the same IPv6 address
@raises ValueError: if the input IPv6 address is poorly formatted
"""
if ip == '':
raise ValueError, "bad address"
if ip == '::': # boundary handling
ip = ''
elif ip[:2] == '::':
ip = ip[1:]
elif ip[0] == ':':
raise ValueError, "bad address"
elif ip[-2:] == '::':
ip = ip[:-1]
elif ip[-1] == ':':
raise ValueError, "bad address"
b = []
doublecolon = False
for n in ip.split(':'):
if n == '': # double-colon
if doublecolon:
raise ValueError, "bad address"
doublecolon = True
b.append(None)
continue
if n.find('.') >= 0: # IPv4
n = n.split('.')
if len(n) != 4:
raise ValueError, "bad address"
for i in n:
b.append(int(i))
continue
n = ('0'*(4-len(n))) + n
b.append(int(n[:2],16))
b.append(int(n[2:],16))
bb = 0L
for n in b:
if n is None:
for i in xrange(17-len(b)):
bb *= 256
continue
bb *= 256
bb += n
return bb
ipv4addrmask = 65535L*256*256*256*256
class IP_List:
"""Stores mutltiple IP address ranges.
@type ipv4list: C{list} of C{long}
@ivar ipv4list: the starting IP addresses of the ranges
@type ipv4dict: C{dictionary} of {C{long}: C{long}}
@ivar ipv4dict: the IP address ranges, keys are the starting
IP addresses, values are the ends of the IP address ranges
@type ipv6list: C{list} of C{long}
@ivar ipv6list: the starting IPv6 addresses of the ranges
@type ipv6dict: C{dictionary} of {C{long}: C{long}}
@ivar ipv6dict: the IPv6 address ranges, keys are the starting
IPv6 addresses, values are the ends of the IPv6 address ranges
"""
def __init__(self, entrylist=None):
"""Initialize the instance.
@type entrylist: C{list} of (C{string}, C{string})
@param entrylist: the IP address ranges to start with
(optional, defaults to None)
"""
self.ipv4list = [] # starts of ranges
self.ipv4dict = {} # start: end of ranges
self.ipv6list = [] # "
self.ipv6dict = {} # "
if entrylist:
l4 = []
l6 = []
for b,e in entrylist:
assert b <= e
if b.find(':') < 0: # IPv4
b = to_long_ipv4(b)
e = to_long_ipv4(e)
l4.append((b,e))
else:
b = to_long_ipv6(b)
e = to_long_ipv6(e)
bb = b % (256*256*256*256)
if bb == ipv4addrmask:
b -= bb
e -= bb
l4.append((b,e))
else:
l6.append((b,e))
self._import_ipv4(l4)
self._import_ipv6(l6)
def __nonzero__(self):
"""Check whether there are any IP address ranges stored.
@rtype: C{boolean}
@return: whether there are IP address ranges stored
"""
return bool(self.ipv4list or self.ipv6list)
def append(self, ip_beg, ip_end = None):
"""Add a new IP address range.
@type ip_beg: C{string}
@param ip_beg: the beginning IP address for the range
@type ip_end: C{string}
@param ip_end: the ending IP address for the range
(optional, defaults to the beginning IP address)
"""
if ip_end is None:
ip_end = ip_beg
else:
assert ip_beg <= ip_end
if ip_beg.find(':') < 0: # IPv4
ip_beg = to_long_ipv4(ip_beg)
ip_end = to_long_ipv4(ip_end)
l = self.ipv4list
d = self.ipv4dict
else:
ip_beg = to_long_ipv6(ip_beg)
ip_end = to_long_ipv6(ip_end)
bb = ip_beg % (256*256*256*256)
if bb == ipv4addrmask:
ip_beg -= bb
ip_end -= bb
l = self.ipv4list
d = self.ipv4dict
else:
l = self.ipv6list
d = self.ipv6dict
p = bisect(l,ip_beg)-1
if p >= 0:
while p < len(l):
range_beg = l[p]
if range_beg > ip_end+1:
done = True
break
range_end = d[range_beg]
if range_end < ip_beg-1:
p += 1
if p == len(l):
done = True
break
continue
# if neither of the above conditions is true, the ranges overlap
ip_beg = min(ip_beg, range_beg)
ip_end = max(ip_end, range_end)
del l[p]
del d[range_beg]
break
insort(l,ip_beg)
d[ip_beg] = ip_end
def _import_ipv4(self, entrylist):
"""Initialize an empty IPv4 storage with IPv4 address ranges.
@type entrylist: C{list} of (C{long}, C{long})
@param entrylist: the IPv4 address ranges to start with
"""
assert not self.ipv4list
if not entrylist:
return
entrylist.sort()
l = []
b1,e1 = entrylist[0]
for b2,e2 in entrylist:
if e1+1 >= b2:
e1 = max(e1,e2)
else:
l.append((b1,e1))
b1 = b2
e1 = e2
l.append((b1,e1))
self.ipv4list = [b for b,e in l]
for b,e in l:
self.ipv4dict[b] = e
def _import_ipv6(self, entrylist):
"""Initialize an empty IPv6 storage with IPv6 address ranges.
@type entrylist: C{list} of (C{long}, C{long})
@param entrylist: the IPv6 address ranges to start with
"""
assert not self.ipv6list
if not entrylist:
return
entrylist.sort()
l = []
b1,e1 = entrylist[0]
for b2,e2 in entrylist:
if e1+1 >= b2:
e1 = max(e1,e2)
else:
l.append((b1,e1))
b1 = b2
e1 = e2
l.append((b1,e1))
self.ipv6list = [b for b,e in l]
for b,e in l:
self.ipv6dict[b] = e
def includes(self, ip):
"""Determine whether the IP address is included in any of the ranges.
@type ip: C{string}
@param ip: the IP address to check
@rtype: C{boolean}
@return: whether the IP address is in one of the ranges
"""
if not (self.ipv4list or self.ipv6list):
return False
if ip.find(':') < 0: # IPv4
ip = to_long_ipv4(ip)
l = self.ipv4list
d = self.ipv4dict
else:
ip = to_long_ipv6(ip)
bb = ip % (256*256*256*256)
if bb == ipv4addrmask:
ip -= bb
l = self.ipv4list
d = self.ipv4dict
else:
l = self.ipv6list
d = self.ipv6dict
for ip_beg in l[bisect(l,ip)-1:]:
if ip == ip_beg:
return True
ip_end = d[ip_beg]
if ip > ip_beg and ip <= ip_end:
return True
return False
def read_rangelist(self, file):
"""Parse a file for lists of IPv4 address ranges to initialize the empty storage with.
The file to parse must have lines in the format 'whatever:whatever:ip-ip'.
The 'whatever' will be ignored, and the IP addresses must be in IPv4
format. If the range is only a single IP address, omit the '-' and the
second 'ip'. Empty lines will be ignored, as will lines beginning with
'#' which can be used for comments.
@type file: C{string}
@param file: the name of the file to parse
"""
l = []
f = open(file, 'r')
while True:
line = f.readline()
if not line:
break
line = line.strip()
if not line or line[0] == '#':
continue
line = line.split(':')[-1]
try:
ip1,ip2 = line.split('-')
except:
ip1 = line
ip2 = line
try:
ip1 = to_long_ipv4(ip1)
ip2 = to_long_ipv4(ip2)
assert ip1 <= ip2
except:
logger.warning('could not parse IP range: '+line)
l.append((ip1,ip2))
f.close()
self._import_ipv4(l)
def is_ipv4(ip):
"""Determine whether the IP address is in the IPv4 format.
@type ip: C{string}
@param ip: the IP address to check
@rtype: C{boolean}
@return: whether the IP address is for IPv4
"""
return ip.find(':') < 0
def is_valid_ip(ip):
"""Determine whether the IP address is a valid IPv4 or IPv6 address.
@type ip: C{string}
@param ip: the IP address to check
@rtype: C{boolean}
@return: whether the IP address is valid
"""
try:
if is_ipv4(ip):
a = ip.split('.')
assert len(a) == 4
for i in a:
chr(int(i))
return True
to_long_ipv6(ip)
return True
except:
return False
|