File: threatnet.py

package info (click to toggle)
python-shodan 1.28.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 400 kB
  • sloc: python: 2,674; makefile: 150
file content (67 lines) | stat: -rw-r--r-- 2,170 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import requests
import json

from .exception import APIError


class Threatnet:
    """Wrapper around the Threatnet REST and Streaming APIs

    :param key: The Shodan API key that can be obtained from your account page (https://account.shodan.io)
    :type key: str
    :ivar stream: An instance of `shodan.Threatnet.Stream` that provides access to the Streaming API.
    """

    class Stream:

        base_url = 'https://stream.shodan.io'

        def __init__(self, parent, proxies=None):
            self.parent = parent
            self.proxies = proxies

        def _create_stream(self, name):
            try:
                req = requests.get(self.base_url + name, params={'key': self.parent.api_key},
                                   stream=True, proxies=self.proxies)
            except Exception:
                raise APIError('Unable to contact the Shodan Streaming API')

            if req.status_code != 200:
                try:
                    raise APIError(req.json()['error'])
                except Exception:
                    pass
                raise APIError('Invalid API key or you do not have access to the Streaming API')
            return req

        def events(self):
            stream = self._create_stream('/threatnet/events')
            for line in stream.iter_lines():
                if line:
                    banner = json.loads(line)
                    yield banner

        def backscatter(self):
            stream = self._create_stream('/threatnet/backscatter')
            for line in stream.iter_lines():
                if line:
                    banner = json.loads(line)
                    yield banner

        def activity(self):
            stream = self._create_stream('/threatnet/ssh')
            for line in stream.iter_lines():
                if line:
                    banner = json.loads(line)
                    yield banner

    def __init__(self, key):
        """Initializes the API object.

        :param key: The Shodan API key.
        :type key: str
        """
        self.api_key = key
        self.base_url = 'https://api.shodan.io'
        self.stream = self.Stream(self)