File: utils.py

package info (click to toggle)
zmap 4.3.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,628 kB
  • sloc: ansic: 15,033; python: 1,085; yacc: 125; sh: 120; lex: 28; makefile: 4
file content (202 lines) | stat: -rw-r--r-- 6,074 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import ipaddress
import re

from random import random
from timeout_decorator import timeout
import typing

import zmap_wrapper


def enumerate_IP_range(ip_range: str):
    """
    Args:
        ip_range (str): IP range in notation "start_ip-end_ip"

    Returns:
        List[str]: List of IP addresses in the range
    """
    start_ip, end_ip = ip_range.split("-")
    start_ip = ipaddress.ip_address(ip_range.split("-")[0])
    end_ip = ipaddress.ip_address(ip_range.split("-")[1])
    return [str(ipaddress.ip_address(ip)) for ip in range(int(start_ip), int(end_ip) + 1)]


def test_enumerate_IP_range():
    ip_range = "189.23.45.32-189.23.45.34"  # range
    expected_ips = [
        "189.23.45.32",
        "189.23.45.33",
        "189.23.45.34",
    ]
    assert enumerate_IP_range(ip_range) == expected_ips


def enumerate_port_range(port_range: str):
    """
    Args:
        port_range (str): Port range in notation "start_port-end_port"

    Returns:
        List[int]: List of ports in the range
    """
    start_port, end_port = port_range.split("-")
    return list(str(i) for i in range(int(start_port), int(end_port) + 1))


def test_enumerate_port_range():
    port_range = "22-25"  # range
    expected_ports = ["22", "23", "24", "25"]
    assert enumerate_port_range(port_range) == expected_ports


## utils
def check_uniqueness_ip_list(ip_list):
    """
    Args:
        ip_list (List(str)): List of IP addresses

    Returns:
        bool: True if all IPs are unique, False otherwise
    """
    return len(ip_list) == len(set(ip_list))


def test_check_uniqueness_ip_list():
    ip_list = [
        "1.1.1.1",
        "1.1.1.1"
    ]
    assert not check_uniqueness_ip_list(ip_list)
    ip_list = ["1.1.1.1"]
    assert check_uniqueness_ip_list(ip_list)


def check_coverage_of_ip_list(ip_list: typing.List[str], subnet: str):
    """
    Checks if a list of IPs fully covers a subnet

    Args:
        ip_list (List(str)): List of IP addresses
        subnet (str): subnet in CIDR notation

    Returns:
        bool: True if all IPs in subnet are present in ip_list, False otherwise
    """
    # Convert the subnet string to an ipaddress object
    subnet = ipaddress.ip_network(subnet)

    # Convert the list of IPs to ipaddress objects
    ip_objects = {ipaddress.ip_address(ip) for ip in ip_list}

    # Check if all IPs in the subnet are covered by the IP list
    for ip in subnet.hosts():
        if ip not in ip_objects:
            return False

    return True


def test_check_coverage_of_ip_list():
    ip_list = [
        "192.168.1.0",
        "192.168.1.1",
        "192.168.1.2",
        "192.168.1.3",
    ]
    subnet = "192.168.1.0/30"
    assert check_coverage_of_ip_list(ip_list, subnet)
    ip_list = [
        "192.168.1.0",
        "192.168.1.2",
        "192.168.1.3",
    ]
    assert not check_coverage_of_ip_list(ip_list, subnet)


def parse_ports_string(port_string) -> typing.List[str]:
    """
    Parses a string of ports in the format "22-25,80,443" into a list of individual ports
    Args:
        port_string (str): String of ports in the format "22-25,80,443"
    Returns:
        List[str]: List of individual ports
    """
    ports = []
    # Regular expression to match individual ports or port ranges
    pattern = re.compile(r'(\d+)-(\d+)|(\d+)')
    matches = pattern.findall(port_string)

    for match in matches:
        if match[0]:  # If it's a port range
            start = int(match[0])
            end = int(match[1])
            ports.extend(range(start, end + 1))
        else:  # If it's a single port
            ports.append(int(match[2]))

    return [str(port) for port in ports]


def test_parse_ports_string():
    tests = {
        "22": ["22"],
        "22-25": ["22", "23", "24", "25"],
        "22,80": ["22", "80"],
        "24-28,443,34,8080-8085": ["24", "25", "26", "27", "28", "443", "34", "8080",
                                   "8081", "8082", "8083", "8084", "8085"]
    }
    for t in tests:
        output = parse_ports_string(t)
        expected_output = tests[t]
        assert len(output) == len(expected_output), "lists don't match in length"
        output.sort()
        expected_output.sort()
        for i in range(len(output)):
            assert output[i] == expected_output[i], "lists do not match"


@timeout(5)  # bounding the runtime of the test so we don't get a stalled Github action should a failure occur in ZMap
def bounded_runtime_test(t: zmap_wrapper.Wrapper):
    return t.run()


def write_ips_to_file(num_of_ips, filename):
    """
    Writes a list of public, non-blocked IPs to a file
    Args:
        num_of_ips (int): Number of IPs to write to the file
        filename (str): File to write the IPs to
    Returns:
        List of IPs written to the file, as strings
    """
    subnet_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}/\d{1,2}\b'
    # read in blocked subnets in file "blocklist.conf" into a list
    blocked_subnets = []
    with open("../../conf/blocklist.conf", "r") as file:
        for line in file:
            if not line.startswith("#") and "/" in line:
                # need to use a regex to pull out the subnet
                subnet = re.findall(subnet_pattern, line.strip())
                if subnet:
                    blocked_subnets.append(subnet[0])

    # generate a list of num_of_ips random, non-blocked public IPs
    ips = set()
    for _ in range(num_of_ips):
        while True:
            ip = str(ipaddress.IPv4Address(int(2 ** 32 * random())))
            # ensure the IP is not in the blocklist
            if any(ipaddress.ip_address(ip) in ipaddress.ip_network(subnet) for subnet in blocked_subnets):
                # IP is blocked
                continue
            if ipaddress.ip_address(ip).is_global and not ipaddress.ip_address(ip).is_reserved and ip not in ips:
                # found a good IP
                ips.add(ip)
                break
    # write the IPs to a file
    with open(filename, "w") as file:
        for ip in ips:
            file.write(ip + "\n")

    return ips