File: storageclient.py

package info (click to toggle)
azure-multiapi-storage-python 0.5.2-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 23,652 kB
  • sloc: python: 222,802; sh: 61; makefile: 3
file content (368 lines) | stat: -rwxr-xr-x 16,912 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# -------------------------------------------------------------------------
# Copyright (c) Microsoft.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# --------------------------------------------------------------------------

import sys
from abc import ABCMeta
import logging

logger = logging.getLogger(__name__)
from time import sleep

import requests
from azure.common import (
    AzureException,
)

from ._constants import (
    DEFAULT_SOCKET_TIMEOUT,
    DEFAULT_X_MS_VERSION,
    DEFAULT_USER_AGENT_STRING,
    USER_AGENT_STRING_PREFIX,
    USER_AGENT_STRING_SUFFIX,
)
from ._error import (
    _ERROR_DECRYPTION_FAILURE,
    _http_error_handler,
)
from ._http import HTTPError
from ._http.httpclient import _HTTPClient
from ._serialization import (
    _update_request,
    _add_date_header,
)
from .models import (
    RetryContext,
    LocationMode,
    _OperationContext,
)
from .retry import ExponentialRetry


class StorageClient(object):
    '''
    This is the base class for service objects. Service objects are used to do 
    all requests to Storage. This class cannot be instantiated directly.

    :ivar str account_name:
        The storage account name. This is used to authenticate requests 
        signed with an account key and to construct the storage endpoint. It 
        is required unless a connection string is given, or if a custom 
        domain is used with anonymous authentication.
    :ivar str account_key:
        The storage account key. This is used for shared key authentication. 
        If neither account key or sas token is specified, anonymous access 
        will be used.
    :ivar str sas_token:
        A shared access signature token to use to authenticate requests 
        instead of the account key. If account key and sas token are both 
        specified, account key will be used to sign. If neither are 
        specified, anonymous access will be used.
    :ivar str primary_endpoint:
        The endpoint to send storage requests to.
    :ivar str secondary_endpoint:
        The secondary endpoint to read storage data from. This will only be a 
        valid endpoint if the storage account used is RA-GRS and thus allows 
        reading from secondary.
    :ivar function(context) retry:
        A function which determines whether to retry. Takes as a parameter a 
        :class:`~azure.storage.common.models.RetryContext` object. Returns the number
        of seconds to wait before retrying the request, or None to indicate not 
        to retry.
    :ivar ~azure.storage.common.models.LocationMode location_mode:
        The host location to use to make requests. Defaults to LocationMode.PRIMARY.
        Note that this setting only applies to RA-GRS accounts as other account 
        types do not allow reading from secondary. If the location_mode is set to 
        LocationMode.SECONDARY, read requests will be sent to the secondary endpoint. 
        Write requests will continue to be sent to primary.
    :ivar str protocol:
        The protocol to use for requests. Defaults to https.
    :ivar requests.Session request_session:
        The session object to use for http requests.
    :ivar function(request) request_callback:
        A function called immediately before each request is sent. This function 
        takes as a parameter the request object and returns nothing. It may be 
        used to added custom headers or log request data.
    :ivar function() response_callback:
        A function called immediately after each response is received. This 
        function takes as a parameter the response object and returns nothing. 
        It may be used to log response data.
    :ivar function() retry_callback:
        A function called immediately after retry evaluation is performed. This 
        function takes as a parameter the retry context object and returns nothing. 
        It may be used to detect retries and log context information.
    '''

    __metaclass__ = ABCMeta

    def __init__(self, connection_params):
        '''
        :param obj connection_params: The parameters to use to construct the client.
        '''
        self.account_name = connection_params.account_name
        self.account_key = connection_params.account_key
        self.sas_token = connection_params.sas_token
        self.is_emulated = connection_params.is_emulated

        self.primary_endpoint = connection_params.primary_endpoint
        self.secondary_endpoint = connection_params.secondary_endpoint

        protocol = connection_params.protocol
        request_session = connection_params.request_session or requests.Session()
        socket_timeout = connection_params.socket_timeout or DEFAULT_SOCKET_TIMEOUT
        self._httpclient = _HTTPClient(
            protocol=protocol,
            session=request_session,
            timeout=socket_timeout,
        )

        self.retry = ExponentialRetry().retry
        self.location_mode = LocationMode.PRIMARY

        self.request_callback = None
        self.response_callback = None
        self.retry_callback = None
        self._X_MS_VERSION = DEFAULT_X_MS_VERSION
        self._USER_AGENT_STRING = DEFAULT_USER_AGENT_STRING

    def _update_user_agent_string(self, service_package_version):
        self._USER_AGENT_STRING = '{}{} {}'.format(USER_AGENT_STRING_PREFIX,
                                                   service_package_version,
                                                   USER_AGENT_STRING_SUFFIX)

    @property
    def socket_timeout(self):
        return self._httpclient.timeout

    @socket_timeout.setter
    def socket_timeout(self, value):
        self._httpclient.timeout = value

    @property
    def protocol(self):
        return self._httpclient.protocol

    @protocol.setter
    def protocol(self, value):
        self._httpclient.protocol = value

    @property
    def request_session(self):
        return self._httpclient.session

    @request_session.setter
    def request_session(self, value):
        self._httpclient.session = value

    def set_proxy(self, host, port, user=None, password=None):
        '''
        Sets the proxy server host and port for the HTTP CONNECT Tunnelling.

        :param str host: Address of the proxy. Ex: '192.168.0.100'
        :param int port: Port of the proxy. Ex: 6000
        :param str user: User for proxy authorization.
        :param str password: Password for proxy authorization.
        '''
        self._httpclient.set_proxy(host, port, user, password)

    def _get_host_locations(self, primary=True, secondary=False):
        locations = {}
        if primary:
            locations[LocationMode.PRIMARY] = self.primary_endpoint
        if secondary:
            locations[LocationMode.SECONDARY] = self.secondary_endpoint
        return locations

    def _apply_host(self, request, operation_context, retry_context):
        if operation_context.location_lock and operation_context.host_location:
            # If this is a location locked operation and the location is set, 
            # override the request location and host_location.
            request.host_locations = operation_context.host_location
            request.host = list(operation_context.host_location.values())[0]
            retry_context.location_mode = list(operation_context.host_location.keys())[0]
        elif len(request.host_locations) == 1:
            # If only one location is allowed, use that location.
            request.host = list(request.host_locations.values())[0]
            retry_context.location_mode = list(request.host_locations.keys())[0]
        else:
            # If multiple locations are possible, choose based on the location mode.
            request.host = request.host_locations.get(self.location_mode)
            retry_context.location_mode = self.location_mode

    @staticmethod
    def extract_date_and_request_id(retry_context):
        if getattr(retry_context, 'response', None) is None:
            return ""
        resp = retry_context.response

        if 'date' in resp.headers and 'x-ms-request-id' in resp.headers:
            return str.format("Server-Timestamp={0}, Server-Request-ID={1}",
                              resp.headers['date'], resp.headers['x-ms-request-id'])
        elif 'date' in resp.headers:
            return str.format("Server-Timestamp={0}", resp.headers['date'])
        elif 'x-ms-request-id' in resp.headers:
            return str.format("Server-Request-ID={0}", resp.headers['x-ms-request-id'])
        else:
            return ""

    def _perform_request(self, request, parser=None, parser_args=None, operation_context=None):
        '''
        Sends the request and return response. Catches HTTPError and hands it
        to error handler
        '''
        operation_context = operation_context or _OperationContext()
        retry_context = RetryContext()

        # Apply the appropriate host based on the location mode
        self._apply_host(request, operation_context, retry_context)

        # Apply common settings to the request
        _update_request(request, self._X_MS_VERSION, self._USER_AGENT_STRING)
        client_request_id_prefix = str.format("Client-Request-ID={0}", request.headers['x-ms-client-request-id'])

        while True:
            try:
                try:
                    # Execute the request callback 
                    if self.request_callback:
                        self.request_callback(request)

                    # Add date and auth after the callback so date doesn't get too old and 
                    # authentication is still correct if signed headers are added in the request 
                    # callback. This also ensures retry policies with long back offs 
                    # will work as it resets the time sensitive headers.
                    _add_date_header(request)
                    self.authentication.sign_request(request)

                    # Set the request context
                    retry_context.request = request

                    # Log the request before it goes out
                    logger.info("%s Outgoing request: Method=%s, Path=%s, Query=%s, Headers=%s.",
                                client_request_id_prefix,
                                request.method,
                                request.path,
                                request.query,
                                str(request.headers).replace('\n', ''))

                    # Perform the request
                    response = self._httpclient.perform_request(request)

                    # Execute the response callback
                    if self.response_callback:
                        self.response_callback(response)

                    # Set the response context
                    retry_context.response = response

                    # Log the response when it comes back
                    logger.info("%s Receiving Response: "
                                "%s, HTTP Status Code=%s, Message=%s, Headers=%s.",
                                client_request_id_prefix,
                                self.extract_date_and_request_id(retry_context),
                                response.status,
                                response.message,
                                str(request.headers).replace('\n', ''))

                    # Parse and wrap HTTP errors in AzureHttpError which inherits from AzureException
                    if response.status >= 300:
                        # This exception will be caught by the general error handler
                        # and raised as an azure http exception
                        _http_error_handler(
                            HTTPError(response.status, response.message, response.headers, response.body))

                    # Parse the response
                    if parser:
                        if parser_args:
                            args = [response]
                            args.extend(parser_args)
                            return parser(*args)
                        else:
                            return parser(response)
                    else:
                        return
                except AzureException as ex:
                    retry_context.exception = ex
                    raise ex
                except Exception as ex:
                    retry_context.exception = ex
                    if sys.version_info >= (3,):
                        # Automatic chaining in Python 3 means we keep the trace
                        raise AzureException(ex.args[0])
                    else:
                        # There isn't a good solution in 2 for keeping the stack trace 
                        # in general, or that will not result in an error in 3
                        # However, we can keep the previous error type and message
                        # TODO: In the future we will log the trace
                        msg = ""
                        if len(ex.args) > 0:
                            msg = ex.args[0]
                        raise AzureException('{}: {}'.format(ex.__class__.__name__, msg))

            except AzureException as ex:
                # only parse the strings used for logging if logging is at least enabled for CRITICAL
                if logger.isEnabledFor(logging.CRITICAL):
                    exception_str_in_one_line = str(ex).replace('\n', '')
                    status_code = retry_context.response.status if retry_context.response is not None else 'Unknown'
                    timestamp_and_request_id = self.extract_date_and_request_id(retry_context)

                logger.info("%s Operation failed: checking if the operation should be retried. "
                            "Current retry count=%s, %s, HTTP status code=%s, Exception=%s.",
                            client_request_id_prefix,
                            retry_context.count if hasattr(retry_context, 'count') else 0,
                            timestamp_and_request_id,
                            status_code,
                            exception_str_in_one_line)

                # Decryption failures (invalid objects, invalid algorithms, data unencrypted in strict mode, etc)
                # will not be resolved with retries.
                if str(ex) == _ERROR_DECRYPTION_FAILURE:
                    logger.error("%s Encountered decryption failure: this cannot be retried. "
                                 "%s, HTTP status code=%s, Exception=%s.",
                                 client_request_id_prefix,
                                 timestamp_and_request_id,
                                 status_code,
                                 exception_str_in_one_line)
                    raise ex

                # Determine whether a retry should be performed and if so, how 
                # long to wait before performing retry.
                retry_interval = self.retry(retry_context)
                if retry_interval is not None:
                    # Execute the callback
                    if self.retry_callback:
                        self.retry_callback(retry_context)

                    logger.info(
                        "%s Retry policy is allowing a retry: Retry count=%s, Interval=%s.",
                        client_request_id_prefix,
                        retry_context.count,
                        retry_interval)

                    # Sleep for the desired retry interval
                    sleep(retry_interval)
                else:
                    logger.error("%s Retry policy did not allow for a retry: "
                                 "%s, HTTP status code=%s, Exception=%s.",
                                 client_request_id_prefix,
                                 timestamp_and_request_id,
                                 status_code,
                                 exception_str_in_one_line)
                    raise ex
            finally:
                # If this is a location locked operation and the location is not set, 
                # this is the first request of that operation. Set the location to 
                # be used for subsequent requests in the operation.
                if operation_context.location_lock and not operation_context.host_location:
                    operation_context.host_location = {retry_context.location_mode: request.host}