File: client_subnet_authority.py

package info (click to toggle)
oarc-dsc-datatool 1.4.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 492 kB
  • sloc: python: 1,151; xml: 859; sh: 125; makefile: 4
file content (181 lines) | stat: -rw-r--r-- 6,251 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""dsc_datatool.generator.client_subnet_authority

See `man dsc-datatool-generator client_subnet_authority`.

Part of dsc_datatool.

:copyright: 2024 OARC, Inc.
"""

import csv
import ipaddress
import logging
from urllib.request import Request, urlopen
from io import StringIO

from dsc_datatool import Generator, Dataset, Dimension, args, encoding


_whois2rir = {
    'whois.apnic.net': 'APNIC',
    'whois.arin.net': 'ARIN',
    'whois.ripe.net': 'RIPE NCC',
    'whois.lacnic.net': 'LACNIC',
    'whois.afrinic.net': 'AFRINIC',
}

_desig2rir = {
    'apnic': 'APNIC',
    'arin': 'ARIN',
    'ripe ncc': 'RIPE NCC',
    'lacnic': 'LACNIC',
    'afrinic': 'AFRINIC',
    'iana': 'IANA',
    '6to4': 'IANA',
}

class client_subnet_authority(Generator):
    auth = None
    nonstrict = False


    def _read(self, input):
        global _whois2rir, _desig2rir
        for row in csv.reader(input):
            prefix, designation, date, whois, rdap, status, note = row
            if prefix == 'Prefix':
                continue
            rir = designation.replace('Administered by ', '').lower()

            whois = whois.lower()
            if whois in _whois2rir:
                rir = _whois2rir[whois]
            else:
                if rir in _desig2rir:
                    rir = _desig2rir[rir]
                else:
                    found = None
                    for k, v in _desig2rir.items():
                        if k in rir:
                            found = v
                            break
                    if found:
                        rir = found
                    else:
                        if status == 'RESERVED':
                            rir = 'IANA'
                        elif designation == 'Segment Routing (SRv6) SIDs':
                            # TODO: How to better handle this weird allocation?
                            rir = 'IANA'
                        else:
                            raise Exception('Unknown whois/designation: %r/%r' % (whois, designation))

            try:
                net = ipaddress.ip_network(prefix)
            except Exception:
                ip, net = prefix.split('/')
                net = ipaddress.ip_network('%s.0.0.0/%s' % (int(ip), net))

            if net.version == 4:
                idx = ipaddress.ip_network('%s/8' % net.network_address, strict=False)
            else:
                idx = ipaddress.ip_network('%s/24' % net.network_address, strict=False)

            if idx.network_address in self.auth:
                self.auth[idx.network_address].append({'net': net, 'auth': rir})
            else:
                self.auth[idx.network_address] = [{'net': net, 'auth': rir}]


    def __init__(self, opts):
        Generator.__init__(self, opts)
        self.auth = {}
        csvs = opts.get('csv', None)
        urlv4 = opts.get('urlv4', 'https://www.iana.org/assignments/ipv4-address-space/ipv4-address-space.csv')
        urlv6 = opts.get('urlv6', 'https://www.iana.org/assignments/ipv6-unicast-address-assignments/ipv6-unicast-address-assignments.csv')
        if opts.get('nonstrict', False):
            self.nonstrict = True

        if csvs:
            if not isinstance(csvs, list):
                csvs = [ csvs ]
            for file in csvs:
                with open(file, newline='', encoding=encoding) as csvfile:
                    self._read(csvfile)
        elif opts.get('fetch', 'no').lower() == 'yes':
            urls = opts.get('url', [ urlv4, urlv6 ])
            if urls and not isinstance(urls, list):
                urls = [ urls ]
            logging.info('bootstrapping client subnet authority using URLs')
            for url in urls:
                logging.info('fetching %s' % url)
                self._read(StringIO(urlopen(Request(url)).read().decode('utf-8')))
        else:
            raise Exception('No authorities bootstrapped, please specify csv= or fetch=yes')


    def process(self, datasets):
        gen_datasets = []

        for dataset in datasets:
            if dataset.name != 'client_subnet':
                continue

            subnets = {}
            for d1 in dataset.dimensions:
                for d2 in d1.dimensions:
                    for k, v in d2.values.items():
                        if k == args.skipped_key:
                            continue
                        elif k == args.skipped_sum_key:
                            continue

                        if k in subnets:
                            subnets[k] += v
                        else:
                            subnets[k] = v

            auth = {}
            for subnet in subnets:
                try:
                    ip = ipaddress.ip_address(subnet)
                except Exception as e:
                    if not self.nonstrict:
                        raise e
                    continue
                if ip.version == 4:
                    idx = ipaddress.ip_network('%s/8' % ip, strict=False)
                    ip = ipaddress.ip_network('%s/32' % ip)
                else:
                    idx = ipaddress.ip_network('%s/24' % ip, strict=False)
                    ip = ipaddress.ip_network('%s/128' % ip)
                if not idx.network_address in self.auth:
                    idx = '??'
                else:
                    for entry in self.auth[idx.network_address]:
                        if entry['net'].overlaps(ip):
                            idx = entry['auth']
                            break

                if idx in auth:
                    auth[idx] += subnets[subnet]
                else:
                    auth[idx] = subnets[subnet]

            if auth:
                authd = Dataset()
                authd.name = 'client_subnet_authority'
                authd.start_time = dataset.start_time
                authd.stop_time = dataset.stop_time
                gen_datasets.append(authd)

                authd1 = Dimension('ClientAuthority')
                authd1.values = auth
                authd.dimensions.append(authd1)

        return gen_datasets


import sys
if sys.version_info[0] == 3 and sys.version_info[1] == 5: # pragma: no cover
    Generator.__init_subclass__(client_subnet_authority)