File: helpers.py

package info (click to toggle)
python-shodan 1.28.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 400 kB
  • sloc: python: 2,674; makefile: 150
file content (175 lines) | stat: -rw-r--r-- 5,314 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import gzip
import requests
import json

from .exception import APIError

try:
    basestring
except NameError:
    basestring = str


def create_facet_string(facets):
    """Converts a Python list of facets into a comma-separated string that can be understood by
    the Shodan API.
    """
    facet_str = ''
    for facet in facets:
        if isinstance(facet, basestring):
            facet_str += facet
        else:
            facet_str += '{}:{}'.format(facet[0], facet[1])
        facet_str += ','
    return facet_str[:-1]


def api_request(key, function, params=None, data=None, base_url='https://api.shodan.io',
                method='get', retries=1, proxies=None):
    """General-purpose function to create web requests to SHODAN.

    Arguments:
        function  -- name of the function you want to execute
        params    -- dictionary of parameters for the function
        proxies   -- a proxies array for the requests library

    Returns
        A dictionary containing the function's results.

    """
    # Add the API key parameter automatically
    params['key'] = key

    # Send the request
    tries = 0
    error = False
    while tries <= retries:
        try:
            if method.lower() == 'post':
                data = requests.post(base_url + function, json.dumps(data), params=params,
                                     headers={'content-type': 'application/json'},
                                     proxies=proxies)
            elif method.lower() == 'delete':
                data = requests.delete(base_url + function, params=params, proxies=proxies)
            elif method.lower() == 'put':
                data = requests.put(base_url + function, params=params, proxies=proxies)
            else:
                data = requests.get(base_url + function, params=params, proxies=proxies)

            # Exit out of the loop
            break
        except Exception:
            error = True
            tries += 1

    if error and tries >= retries:
        raise APIError('Unable to connect to Shodan')

    # Check that the API key wasn't rejected
    if data.status_code == 401:
        try:
            raise APIError(data.json()['error'])
        except (ValueError, KeyError):
            pass
        raise APIError('Invalid API key')

    # Parse the text into JSON
    try:
        data = data.json()
    except Exception:
        raise APIError('Unable to parse JSON response')

    # Raise an exception if an error occurred
    if type(data) == dict and data.get('error', None):
        raise APIError(data['error'])

    # Return the data
    return data


def iterate_files(files, fast=False):
    """Loop over all the records of the provided Shodan output file(s)."""
    loads = json.loads
    if fast:
        # Try to use ujson for parsing JSON if it's available and the user requested faster throughput
        # It's significantly faster at encoding/ decoding JSON but it doesn't support as
        # many options as the standard library. As such, we're mostly interested in using it for
        # decoding since reading/ parsing files will use up the most time.
        # pylint: disable=E0401
        try:
            from ujson import loads
        except Exception:
            pass

    if isinstance(files, basestring):
        files = [files]

    for filename in files:
        # Create a file handle depending on the filetype
        if filename.endswith('.gz'):
            fin = gzip.open(filename, 'r')
        else:
            fin = open(filename, 'r')

        for line in fin:
            # Ensure the line has been decoded into a string to prevent errors w/ Python3
            if not isinstance(line, basestring):
                line = line.decode('utf-8')

            # Convert the JSON into a native Python object
            banner = loads(line)
            yield banner


def get_screenshot(banner):
    if 'opts' in banner and 'screenshot' in banner['opts']:
        return banner['opts']['screenshot']
    return None


def get_ip(banner):
    if 'ipv6' in banner:
        return banner['ipv6']
    return banner['ip_str']


def open_file(filename, mode='a', compresslevel=9):
    return gzip.open(filename, mode, compresslevel)


def write_banner(fout, banner):
    line = json.dumps(banner) + '\n'
    fout.write(line.encode('utf-8'))


def humanize_bytes(byte_count, precision=1):
    """Return a humanized string representation of a number of bytes.
    >>> humanize_bytes(1)
    '1 byte'
    >>> humanize_bytes(1024)
    '1.0 kB'
    >>> humanize_bytes(1024*123)
    '123.0 kB'
    >>> humanize_bytes(1024*12342)
    '12.1 MB'
    >>> humanize_bytes(1024*12342,2)
    '12.05 MB'
    >>> humanize_bytes(1024*1234,2)
    '1.21 MB'
    >>> humanize_bytes(1024*1234*1111,2)
    '1.31 GB'
    >>> humanize_bytes(1024*1234*1111,1)
    '1.3 GB'
    """
    if byte_count == 1:
        return '1 byte'
    if byte_count < 1024:
        '{0:0.{1}f} {2}'.format(byte_count, 0, 'bytes')

    suffixes = ['KB', 'MB', 'GB', 'TB', 'PB']
    multiple = 1024.0  # .0 to force float on python 2
    for suffix in suffixes:
        byte_count /= multiple
        if byte_count < multiple:
            return '{0:0.{1}f} {2}'.format(byte_count, precision, suffix)
    return '{0:0.{1}f} {2}'.format(byte_count, precision, suffix)