File: stream.py

package info (click to toggle)
python-shodan 1.28.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 400 kB
  • sloc: python: 2,674; makefile: 150
file content (162 lines) | stat: -rw-r--r-- 6,316 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import requests
import json
import ssl

from .exception import APIError


class Stream:

    base_url = 'https://stream.shodan.io'

    def __init__(self, api_key, proxies=None):
        self.api_key = api_key
        self.proxies = proxies

    def _create_stream(self, name, query=None, timeout=None):
        params = {
            'key': self.api_key,
        }
        stream_url = self.base_url + name

        # The user doesn't want to use a timeout
        # If the timeout is specified as 0 then we also don't want to have a timeout
        if (timeout and timeout <= 0) or (timeout == 0):
            timeout = None

        # If the user requested a timeout then we need to disable heartbeat messages
        # which are intended to keep stream connections alive even if there isn't any data
        # flowing through.
        if timeout:
            params['heartbeat'] = False

        if query is not None:
            params['query'] = query

        try:
            while True:
                req = requests.get(stream_url, params=params, stream=True, timeout=timeout,
                                   proxies=self.proxies)

                # Status code 524 is special to Cloudflare
                # It means that no data was sent from the streaming servers which caused Cloudflare
                # to terminate the connection.
                #
                # We only want to exit if there was a timeout specified or the HTTP status code is
                # not specific to Cloudflare.
                if req.status_code != 524 or timeout >= 0:
                    break
        except Exception:
            raise APIError('Unable to contact the Shodan Streaming API')

        if req.status_code != 200:
            try:
                data = json.loads(req.text)
                raise APIError(data['error'])
            except APIError:
                raise
            except Exception:
                pass
            raise APIError('Invalid API key or you do not have access to the Streaming API')
        if req.encoding is None:
            req.encoding = 'utf-8'
        return req

    def _iter_stream(self, stream, raw):
        for line in stream.iter_lines():
            # The Streaming API sends out heartbeat messages that are newlines
            # We want to ignore those messages since they don't contain any data
            if line:
                if raw:
                    yield line
                else:
                    yield json.loads(line)

    def alert(self, aid=None, timeout=None, raw=False):
        if aid:
            stream = self._create_stream('/shodan/alert/{}'.format(aid), timeout=timeout)
        else:
            stream = self._create_stream('/shodan/alert', timeout=timeout)

        try:
            for line in self._iter_stream(stream, raw):
                yield line
        except requests.exceptions.ConnectionError:
            raise APIError('Stream timed out')
        except ssl.SSLError:
            raise APIError('Stream timed out')

    def asn(self, asn, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the ASNs of interest.

        :param asn: A list of ASN to return banner data on.
        :type asn: string[]
        """
        stream = self._create_stream('/shodan/asn/{}'.format(','.join(asn)), timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def banners(self, raw=False, timeout=None):
        """A real-time feed of the data that Shodan is currently collecting. Note that this is only available to
        API subscription plans and for those it only returns a fraction of the data.
        """
        stream = self._create_stream('/shodan/banners', timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def countries(self, countries, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the countries of interest.

        :param countries: A list of countries to return banner data on.
        :type countries: string[]
        """
        stream = self._create_stream('/shodan/countries/{}'.format(','.join(countries)), timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def custom(self, query, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the query of interest. The query
        can vary and mix-match with different arguments (ports, tags, vulns, etc).

        :param query: A space-separated list of key:value filters query to return banner data on.
        :type query: string
        """
        stream = self._create_stream('/shodan/custom', query=query, timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def ports(self, ports, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the ports of interest.

        :param ports: A list of ports to return banner data on.
        :type ports: int[]
        """
        stream = self._create_stream('/shodan/ports/{}'.format(','.join([str(port) for port in ports])), timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def tags(self, tags, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the tags of interest.

        :param tags: A list of tags to return banner data on.
        :type tags: string[]
        """
        stream = self._create_stream('/shodan/tags/{}'.format(','.join(tags)), timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line

    def vulns(self, vulns, raw=False, timeout=None):
        """
        A filtered version of the "banners" stream to only return banners that match the vulnerabilities of interest.

        :param vulns: A list of vulns to return banner data on.
        :type vulns: string[]
        """
        stream = self._create_stream('/shodan/vulns/{}'.format(','.join(vulns)), timeout=timeout)
        for line in self._iter_stream(stream, raw):
            yield line