File: baseapi.py

package info (click to toggle)
python-digitalocean 1.16.0-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid
  • size: 912 kB
  • sloc: python: 4,961; makefile: 46
file content (253 lines) | stat: -rw-r--r-- 8,127 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# -*- coding: utf-8 -*-
import os
import json
import logging
import requests
from . import __name__, __version__
try:
    import urlparse
except ImportError:
    from urllib import parse as urlparse


GET = 'GET'
POST = 'POST'
DELETE = 'DELETE'
PUT = 'PUT'
PATCH = 'PATCH'
REQUEST_TIMEOUT_ENV_VAR = 'PYTHON_DIGITALOCEAN_REQUEST_TIMEOUT_SEC'


class Error(Exception):
    """Base exception class for this module"""
    pass


class TokenError(Error):
    pass


class DataReadError(Error):
    pass


class JSONReadError(Error):
    pass


class NotFoundError(Error):
    pass


class EndPointError(Error):
    pass


class BaseAPI(object):
    """
        Basic api class for
    """
    tokens = []
    _last_used = 0
    end_point = "https://api.digitalocean.com/v2/"

    def __init__(self, *args, **kwargs):
        self.token = os.getenv("DIGITALOCEAN_ACCESS_TOKEN", "")
        self.end_point = os.getenv("DIGITALOCEAN_END_POINT", "https://api.digitalocean.com/v2/")
        self._log = logging.getLogger(__name__)

        self._session = requests.Session()

        for attr in kwargs.keys():
            setattr(self, attr, kwargs[attr])

        parsed_url = urlparse.urlparse(self.end_point)
        if not parsed_url.scheme or not parsed_url.netloc:
            raise EndPointError("Provided end point is not a valid URL. Please use a valid URL")

        if not parsed_url.path:
            self.end_point += '/'

    def __getstate__(self):
        state = self.__dict__.copy()
        # The logger is not pickleable due to using thread.lock
        del state['_log']
        return state

    def __setstate__(self, state):
        self.__dict__ = state
        self._log = logging.getLogger(__name__)

    def __perform_request(self, url, type=GET, params=None):
        """
            This method will perform the real request,
            in this way we can customize only the "output" of the API call by
            using self.__call_api method.
            This method will return the request object.
        """
        if params is None:
            params = {}

        if not self.token:
            raise TokenError("No token provided. Please use a valid token")

        url = urlparse.urljoin(self.end_point, url)

        # lookup table to find out the appropriate requests method,
        # headers and payload type (json or query parameters)
        identity = lambda x: x
        json_dumps = lambda x: json.dumps(x)
        lookup = {
            GET: (self._session.get, {'Content-type': 'application/json'}, 'params', identity),
            PATCH: (requests.patch, {'Content-type': 'application/json'},
                    'data', json_dumps),
            POST: (requests.post, {'Content-type': 'application/json'}, 'data',
                   json_dumps),
            PUT: (self._session.put, {'Content-type': 'application/json'}, 'data',
                  json_dumps),
            DELETE: (self._session.delete,
                     {'content-type': 'application/json'},
                     'data', json_dumps),
        }

        requests_method, headers, payload, transform = lookup[type]
        agent = "{0}/{1} {2}/{3}".format('python-digitalocean',
                                         __version__,
                                         requests.__name__,
                                         requests.__version__)
        headers.update({'Authorization': 'Bearer ' + self.token,
                        'User-Agent': agent})
        kwargs = {'headers': headers, payload: transform(params)}

        timeout = self.get_timeout()
        if timeout:
            kwargs['timeout'] = timeout

        # remove token from log
        headers_str = str(headers)
        for i, token in enumerate(self.tokens):
            headers_str = headers_str.replace(token.strip(), 'TOKEN%s' % i)
        self._log.debug('%s %s %s:%s %s %s' %
                        (type, url, payload, params, headers_str, timeout))

        return requests_method(url, **kwargs)

    def __deal_with_pagination(self, url, method, params, data):
        """
            Perform multiple calls in order to have a full list of elements
            when the API are "paginated". (content list is divided in more
            than one page)
        """
        all_data = data
        while data.get("links", {}).get("pages", {}).get("next"):
            url, query = data["links"]["pages"]["next"].split("?", 1)

            # Merge the query parameters
            for key, value in urlparse.parse_qs(query).items():
                params[key] = value

            data = self.__perform_request(url, method, params).json()

            # Merge the dictionaries
            for key, value in data.items():
                if isinstance(value, list) and key in all_data:
                    all_data[key] += value
                else:
                    all_data[key] = value

        return all_data

    def __init_ratelimit(self, headers):
        # Add the account requests/hour limit
        self.ratelimit_limit = headers.get('Ratelimit-Limit', None)
        # Add the account requests remaining
        self.ratelimit_remaining = headers.get('Ratelimit-Remaining', None)
        # Add the account requests limit reset time
        self.ratelimit_reset = headers.get('Ratelimit-Reset', None)

    @property
    def token(self):
        # use all the tokens round-robin style
        if self.tokens:
            self._last_used = (self._last_used + 1) % len(self.tokens)
            return self.tokens[self._last_used]
        return ""

    @token.setter
    def token(self, token):
        self._last_used = 0
        if isinstance(token, list):
            self.tokens = token
        else:
            # for backward compatibility
            self.tokens = [token]

    def get_timeout(self):
        """
            Checks if any timeout for the requests to DigitalOcean is required.
            To set a timeout, use the REQUEST_TIMEOUT_ENV_VAR environment
            variable.
        """
        timeout_str = os.environ.get(REQUEST_TIMEOUT_ENV_VAR)
        if timeout_str:
            try:
                return float(timeout_str)
            except:
                self._log.error('Failed parsing the request read timeout of '
                                '"%s". Please use a valid float number!' %
                                        timeout_str)
        return None

    def get_data(self, url, type=GET, params=None):
        """
            This method is a basic implementation of __call_api that checks
            errors too. In case of success the method will return True or the
            content of the response to the request.

            Pagination is automatically detected and handled accordingly
        """
        if params is None:
            params = dict()

        # If per_page is not set, make sure it has a sane default
        if type is GET:
            params.setdefault("per_page", 200)

        req = self.__perform_request(url, type, params)
        if req.status_code == 204:
            return True

        if req.status_code == 404:
            raise NotFoundError()

        try:
            data = req.json()
        except ValueError as e:
            raise JSONReadError(
                'Read failed from DigitalOcean: %s' % str(e)
            )

        if not req.ok:
            msg = [data[m] for m in ("id", "message") if m in data][1]
            raise DataReadError(msg)

        # init request limits
        self.__init_ratelimit(req.headers)

        # If there are more elements available (total) than the elements per
        # page, try to deal with pagination. Note: Breaking the logic on
        # multiple pages,
        pages = data.get("links", {}).get("pages", {})
        if pages.get("next") and "page" not in params:
            return self.__deal_with_pagination(url, type, params, data)
        else:
            return data

    def __str__(self):
        return "<%s>" % self.__class__.__name__

    def __unicode__(self):
        return u"%s" % self.__str__()

    def __repr__(self):
        return str(self)