File: iprangeparse.py

package info (click to toggle)
debtorrent 0.1.9
  • links: PTS, VCS
  • area: main
  • in suites: lenny
  • size: 1,452 kB
  • ctags: 1,183
  • sloc: python: 13,526; sh: 274; makefile: 51
file content (378 lines) | stat: -rw-r--r-- 10,969 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# Written by John Hoffman
# Modified by Cameron Dale
# see LICENSE.txt for license information
#
# $Id: iprangeparse.py 266 2007-08-18 02:06:35Z camrdale-guest $

"""Deal with all types of IP addresses and IP address ranges.

@type logger: C{logging.Logger}
@var logger: the logger to send all log messages to for this module
@type ipv4addrmask: C{long}
@var ipv4addrmask: the address mask used to determine if the IP address 
    in C{long} format is v4 encapsulated in a v6 address

"""

from bisect import bisect, insort
import logging

logger = logging.getLogger('DebTorrent.iprangeparse')

def to_long_ipv4(ip):
    """Convert an IP address from a string to a long.
    
    @type ip: C{string}
    @param ip: the IP address
    @rtype: C{long}
    @return: the same IP address
    @raises ValueError: if the input IP address is poorly formatted
    
    """
    
    ip = ip.split('.')
    if len(ip) != 4:
        raise ValueError, "bad address"
    b = 0L
    for n in ip:
        b *= 256
        b += int(n)
    return b


def to_long_ipv6(ip):
    """Convert an IPv6 address from a string to a long.
    
    @type ip: C{string}
    @param ip: the IPv6 address
    @rtype: C{long}
    @return: the same IPv6 address
    @raises ValueError: if the input IPv6 address is poorly formatted
    
    """
    
    if ip == '':
        raise ValueError, "bad address"
    if ip == '::':      # boundary handling
        ip = ''
    elif ip[:2] == '::':
        ip = ip[1:]
    elif ip[0] == ':':
        raise ValueError, "bad address"
    elif ip[-2:] == '::':
        ip = ip[:-1]
    elif ip[-1] == ':':
        raise ValueError, "bad address"

    b = []
    doublecolon = False
    for n in ip.split(':'):
        if n == '':     # double-colon
            if doublecolon:
                raise ValueError, "bad address"
            doublecolon = True
            b.append(None)
            continue
        if n.find('.') >= 0: # IPv4
            n = n.split('.')
            if len(n) != 4:
                raise ValueError, "bad address"
            for i in n:
                b.append(int(i))
            continue
        n = ('0'*(4-len(n))) + n
        b.append(int(n[:2],16))
        b.append(int(n[2:],16))
    bb = 0L
    for n in b:
        if n is None:
            for i in xrange(17-len(b)):
                bb *= 256
            continue
        bb *= 256
        bb += n
    return bb

ipv4addrmask = 65535L*256*256*256*256

class IP_List:
    """Stores mutltiple IP address ranges.
    
    @type ipv4list: C{list} of C{long}
    @ivar ipv4list: the starting IP addresses of the ranges
    @type ipv4dict: C{dictionary} of {C{long}: C{long}}
    @ivar ipv4dict: the IP address ranges, keys are the starting 
        IP addresses, values are the ends of the IP address ranges
    @type ipv6list: C{list} of C{long}
    @ivar ipv6list: the starting IPv6 addresses of the ranges
    @type ipv6dict: C{dictionary} of {C{long}: C{long}}
    @ivar ipv6dict: the IPv6 address ranges, keys are the starting 
        IPv6 addresses, values are the ends of the IPv6 address ranges
    
    """
    
    def __init__(self, entrylist=None):
        """Initialize the instance.
        
        @type entrylist: C{list} of (C{string}, C{string})
        @param entrylist: the IP address ranges to start with
            (optional, defaults to None)
        
        """
        
        self.ipv4list = []  # starts of ranges
        self.ipv4dict = {}  # start: end of ranges
        self.ipv6list = []  # "
        self.ipv6dict = {}  # "

        if entrylist:
            l4 = []
            l6 = []
            for b,e in entrylist:
                assert b <= e
                if b.find(':') < 0:        # IPv4
                    b = to_long_ipv4(b)
                    e = to_long_ipv4(e)
                    l4.append((b,e))
                else:
                    b = to_long_ipv6(b)
                    e = to_long_ipv6(e)
                    bb = b % (256*256*256*256)
                    if bb == ipv4addrmask:
                        b -= bb
                        e -= bb
                        l4.append((b,e))
                    else:
                        l6.append((b,e))
            self._import_ipv4(l4)
            self._import_ipv6(l6)

    def __nonzero__(self):
        """Check whether there are any IP address ranges stored.
        
        @rtype: C{boolean}
        @return: whether there are IP address ranges stored
        
        """
        
        return bool(self.ipv4list or self.ipv6list)


    def append(self, ip_beg, ip_end = None):
        """Add a new IP address range.
        
        @type ip_beg: C{string}
        @param ip_beg: the beginning IP address for the range
        @type ip_end: C{string}
        @param ip_end: the ending IP address for the range
            (optional, defaults to the beginning IP address)
        
        """
        
        if ip_end is None:
            ip_end = ip_beg
        else:
            assert ip_beg <= ip_end
        if ip_beg.find(':') < 0:        # IPv4
            ip_beg = to_long_ipv4(ip_beg)
            ip_end = to_long_ipv4(ip_end)
            l = self.ipv4list
            d = self.ipv4dict
        else:
            ip_beg = to_long_ipv6(ip_beg)
            ip_end = to_long_ipv6(ip_end)
            bb = ip_beg % (256*256*256*256)
            if bb == ipv4addrmask:
                ip_beg -= bb
                ip_end -= bb
                l = self.ipv4list
                d = self.ipv4dict
            else:
                l = self.ipv6list
                d = self.ipv6dict

        p = bisect(l,ip_beg)-1
        if p >= 0:
            while p < len(l):
                range_beg = l[p]
                if range_beg > ip_end+1:
                    done = True
                    break
                range_end = d[range_beg]
                if range_end < ip_beg-1:
                    p += 1
                    if p == len(l):
                        done = True
                        break
                    continue
                # if neither of the above conditions is true, the ranges overlap
                ip_beg = min(ip_beg, range_beg)
                ip_end = max(ip_end, range_end)
                del l[p]
                del d[range_beg]
                break

        insort(l,ip_beg)
        d[ip_beg] = ip_end


    def _import_ipv4(self, entrylist):
        """Initialize an empty IPv4 storage with IPv4 address ranges.
        
        @type entrylist: C{list} of (C{long}, C{long})
        @param entrylist: the IPv4 address ranges to start with
        
        """
        
        assert not self.ipv4list
        if not entrylist:
            return
        entrylist.sort()
        l = []
        b1,e1 = entrylist[0]
        for b2,e2 in entrylist:
            if e1+1 >= b2:
                e1 = max(e1,e2)
            else:
                l.append((b1,e1))
                b1 = b2
                e1 = e2
        l.append((b1,e1))
        self.ipv4list = [b for b,e in l]
        for b,e in l:
            self.ipv4dict[b] = e

    def _import_ipv6(self, entrylist):
        """Initialize an empty IPv6 storage with IPv6 address ranges.
        
        @type entrylist: C{list} of (C{long}, C{long})
        @param entrylist: the IPv6 address ranges to start with
        
        """
        
        assert not self.ipv6list
        if not entrylist:
            return
        entrylist.sort()
        l = []
        b1,e1 = entrylist[0]
        for b2,e2 in entrylist:
            if e1+1 >= b2:
                e1 = max(e1,e2)
            else:
                l.append((b1,e1))
                b1 = b2
                e1 = e2
        l.append((b1,e1))
        self.ipv6list = [b for b,e in l]
        for b,e in l:
            self.ipv6dict[b] = e


    def includes(self, ip):
        """Determine whether the IP address is included in any of the ranges.
        
        @type ip: C{string}
        @param ip: the IP address to check
        @rtype: C{boolean}
        @return: whether the IP address is in one of the ranges
       
        """
        
        if not (self.ipv4list or self.ipv6list):
            return False
        if ip.find(':') < 0:        # IPv4
            ip = to_long_ipv4(ip)
            l = self.ipv4list
            d = self.ipv4dict
        else:
            ip = to_long_ipv6(ip)
            bb = ip % (256*256*256*256)
            if bb == ipv4addrmask:
                ip -= bb
                l = self.ipv4list
                d = self.ipv4dict
            else:
                l = self.ipv6list
                d = self.ipv6dict
        for ip_beg in l[bisect(l,ip)-1:]:
            if ip == ip_beg:
                return True
            ip_end = d[ip_beg]
            if ip > ip_beg and ip <= ip_end:
                return True
        return False

    def read_rangelist(self, file):
        """Parse a file for lists of IPv4 address ranges to initialize the empty storage with.
        
        The file to parse must have lines in the format 'whatever:whatever:ip-ip'.
        The 'whatever' will be ignored, and the IP addresses must be in IPv4 
        format. If the range is only a single IP address, omit the '-' and the 
        second 'ip'. Empty lines will be ignored, as will lines beginning with
        '#' which can be used for comments.
        
        @type file: C{string}
        @param file: the name of the file to parse
        
        """
        
        l = []
        f = open(file, 'r')
        while True:
            line = f.readline()
            if not line:
                break
            line = line.strip()
            if not line or line[0] == '#':
                continue
            line = line.split(':')[-1]
            try:
                ip1,ip2 = line.split('-')
            except:
                ip1 = line
                ip2 = line
            try:
                ip1 = to_long_ipv4(ip1)
                ip2 = to_long_ipv4(ip2)
                assert ip1 <= ip2
            except:
                logger.warning('could not parse IP range: '+line)
            l.append((ip1,ip2))
        f.close()
        self._import_ipv4(l)


def is_ipv4(ip):
    """Determine whether the IP address is in the IPv4 format.
    
    @type ip: C{string}
    @param ip: the IP address to check
    @rtype: C{boolean}
    @return: whether the IP address is for IPv4
   
    """
    
    return ip.find(':') < 0

def is_valid_ip(ip):
    """Determine whether the IP address is a valid IPv4 or IPv6 address.
    
    @type ip: C{string}
    @param ip: the IP address to check
    @rtype: C{boolean}
    @return: whether the IP address is valid
   
    """
    
    try:
        if is_ipv4(ip):
            a = ip.split('.')
            assert len(a) == 4
            for i in a:
                chr(int(i))
            return True
        to_long_ipv6(ip)
        return True
    except:
        return False