File: helpers.py

package info (click to toggle)
python-shodan 1.28.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 400 kB
  • sloc: python: 2,674; makefile: 150
file content (129 lines) | stat: -rw-r--r-- 3,730 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
'''
Helper methods used across the CLI commands.
'''
import click
import datetime
import gzip
import itertools
import os
import sys
from ipaddress import ip_network, ip_address

from .settings import SHODAN_CONFIG_DIR

try:
    basestring            # Python 2
except NameError:
    basestring = (str, )  # Python 3


def get_api_key():
    '''Returns the API key of the current logged-in user.'''
    shodan_dir = os.path.expanduser(SHODAN_CONFIG_DIR)
    keyfile = shodan_dir + '/api_key'

    # If the file doesn't yet exist let the user know that they need to
    # initialize the shodan cli
    if not os.path.exists(keyfile):
        raise click.ClickException('Please run "shodan init <api key>" before using this command')

    # Make sure it is a read-only file
    if not oct(os.stat(keyfile).st_mode).endswith("600"):
        os.chmod(keyfile, 0o600)

    with open(keyfile, 'r') as fin:
        return fin.read().strip()


def escape_data(args):
    # Make sure the string is unicode so the terminal can properly display it
    # We do it using format() so it works across Python 2 and 3
    args = u'{}'.format(args)
    return args.replace('\n', '\\n').replace('\r', '\\r').replace('\t', '\\t')


def timestr():
    return datetime.datetime.utcnow().strftime('%Y-%m-%d')


def open_streaming_file(directory, timestr, compresslevel=9):
    return gzip.open('{}/{}.json.gz'.format(directory, timestr), 'a', compresslevel)


def get_banner_field(banner, flat_field):
    # The provided field is a collapsed form of the actual field
    fields = flat_field.split('.')

    try:
        current_obj = banner
        for field in fields:
            current_obj = current_obj[field]
        return current_obj
    except Exception:
        pass

    return None


def filter_with_netmask(banner, netmask):
    # filtering based on netmask is a more abstract concept than
    # a mere check for a specific field and thus needs its own mechanism
    # this will enable users to use the net:10.0.0.0/8 syntax they are used to
    # to find specific networks from a big shodan download.
    network = ip_network(netmask)
    ip_field = get_banner_field(banner, 'ip')
    if not ip_field:
        return False
    banner_ip_address = ip_address(ip_field)
    return banner_ip_address in network


def match_filters(banner, filters):
    for args in filters:
        flat_field, check = args.split(':', 1)
        if flat_field == 'net':
            return filter_with_netmask(banner, check)

        value = get_banner_field(banner, flat_field)

        # If the field doesn't exist on the banner then ignore the record
        if not value:
            return False

        # It must match all filters to be allowed
        field_type = type(value)

        # For lists of strings we see whether the desired value is contained in the field
        if field_type == list or isinstance(value, basestring):
            if check not in value:
                return False
        elif field_type == int:
            if int(check) != value:
                return False
        elif field_type == float:
            if float(check) != value:
                return False
        else:
            # Ignore unknown types
            pass

    return True


def async_spinner(finished):
    spinner = itertools.cycle(['-', '/', '|', '\\'])
    while not finished.is_set():
        sys.stdout.write('\b{}'.format(next(spinner)))
        sys.stdout.flush()
        finished.wait(0.2)


def humanize_api_plan(plan):
    return {
        'oss': 'Free',
        'dev': 'Membership',
        'basic': 'Freelancer API',
        'plus': 'Small Business API',
        'corp': 'Corporate API',
        'stream-100': 'Enterprise',
    }[plan]