1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
|
# Copyright 2013 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Util functions and classes for cloudstorage_api."""
__all__ = ['set_default_retry_params',
'RetryParams',
]
import copy
import httplib
import logging
import math
import os
import threading
import time
import urllib
try:
from google.appengine.api import urlfetch
from google.appengine.datastore import datastore_rpc
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import utils
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
except ImportError:
from google.appengine.api import urlfetch
from google.appengine.datastore import datastore_rpc
from google.appengine import runtime
from google.appengine.runtime import apiproxy_errors
from google.appengine.ext.ndb import eventloop
from google.appengine.ext.ndb import utils
_RETRIABLE_EXCEPTIONS = (urlfetch.DownloadError,
apiproxy_errors.Error)
_thread_local_settings = threading.local()
_thread_local_settings.default_retry_params = None
def set_default_retry_params(retry_params):
"""Set a default RetryParams for current thread current request."""
_thread_local_settings.default_retry_params = copy.copy(retry_params)
def _get_default_retry_params():
"""Get default RetryParams for current request and current thread.
Returns:
A new instance of the default RetryParams.
"""
default = getattr(_thread_local_settings, 'default_retry_params', None)
if default is None or not default.belong_to_current_request():
return RetryParams()
else:
return copy.copy(default)
def _quote_filename(filename):
"""Quotes filename to use as a valid URI path.
Args:
filename: user provided filename. /bucket/filename.
Returns:
The filename properly quoted to use as URI's path component.
"""
return urllib.quote(filename)
def _unquote_filename(filename):
"""Unquotes a valid URI path back to its filename.
This is the opposite of _quote_filename.
Args:
filename: a quoted filename. /bucket/some%20filename.
Returns:
The filename unquoted.
"""
return urllib.unquote(filename)
def _should_retry(resp):
"""Given a urlfetch response, decide whether to retry that request."""
return (resp.status_code == httplib.REQUEST_TIMEOUT or
(resp.status_code >= 500 and
resp.status_code < 600))
class RetryParams(object):
"""Retry configuration parameters."""
@datastore_rpc._positional(1)
def __init__(self,
backoff_factor=2.0,
initial_delay=0.1,
max_delay=10.0,
min_retries=2,
max_retries=5,
max_retry_period=30.0,
urlfetch_timeout=None,
save_access_token=False):
"""Init.
This object is unique per request per thread.
Library will retry according to this setting when App Engine Server
can't call urlfetch, urlfetch timed out, or urlfetch got a 408 or
500-600 response.
Args:
backoff_factor: exponential backoff multiplier.
initial_delay: seconds to delay for the first retry.
max_delay: max seconds to delay for every retry.
min_retries: min number of times to retry. This value is automatically
capped by max_retries.
max_retries: max number of times to retry. Set this to 0 for no retry.
max_retry_period: max total seconds spent on retry. Retry stops when
this period passed AND min_retries has been attempted.
urlfetch_timeout: timeout for urlfetch in seconds. Could be None,
in which case the value will be chosen by urlfetch module.
save_access_token: persist access token to datastore to avoid
excessive usage of GetAccessToken API. Usually the token is cached
in process and in memcache. In some cases, memcache isn't very
reliable.
"""
self.backoff_factor = self._check('backoff_factor', backoff_factor)
self.initial_delay = self._check('initial_delay', initial_delay)
self.max_delay = self._check('max_delay', max_delay)
self.max_retry_period = self._check('max_retry_period', max_retry_period)
self.max_retries = self._check('max_retries', max_retries, True, int)
self.min_retries = self._check('min_retries', min_retries, True, int)
if self.min_retries > self.max_retries:
self.min_retries = self.max_retries
self.urlfetch_timeout = None
if urlfetch_timeout is not None:
self.urlfetch_timeout = self._check('urlfetch_timeout', urlfetch_timeout)
self.save_access_token = self._check('save_access_token', save_access_token,
True, bool)
self._request_id = os.getenv('REQUEST_LOG_ID')
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
@classmethod
def _check(cls, name, val, can_be_zero=False, val_type=float):
"""Check init arguments.
Args:
name: name of the argument. For logging purpose.
val: value. Value has to be non negative number.
can_be_zero: whether value can be zero.
val_type: Python type of the value.
Returns:
The value.
Raises:
ValueError: when invalid value is passed in.
TypeError: when invalid value type is passed in.
"""
valid_types = [val_type]
if val_type is float:
valid_types.append(int)
if type(val) not in valid_types:
raise TypeError(
'Expect type %s for parameter %s' % (val_type.__name__, name))
if val < 0:
raise ValueError(
'Value for parameter %s has to be greater than 0' % name)
if not can_be_zero and val == 0:
raise ValueError(
'Value for parameter %s can not be 0' % name)
return val
def belong_to_current_request(self):
return os.getenv('REQUEST_LOG_ID') == self._request_id
def delay(self, n, start_time):
"""Calculate delay before the next retry.
Args:
n: the number of current attempt. The first attempt should be 1.
start_time: the time when retry started in unix time.
Returns:
Number of seconds to wait before next retry. -1 if retry should give up.
"""
if (n > self.max_retries or
(n > self.min_retries and
time.time() - start_time > self.max_retry_period)):
return -1
return min(
math.pow(self.backoff_factor, n-1) * self.initial_delay,
self.max_delay)
def _retry_fetch(url, retry_params, **kwds):
"""A blocking fetch function similar to urlfetch.fetch.
This function should be used when a urlfetch has timed out or the response
shows http request timeout. This function will put current thread to
sleep between retry backoffs.
Args:
url: url to fetch.
retry_params: an instance of RetryParams.
**kwds: keyword arguments for urlfetch. If deadline is specified in kwds,
it precedes the one in RetryParams. If none is specified, it's up to
urlfetch to use its own default.
Returns:
A urlfetch response from the last retry. None if no retry was attempted.
Raises:
Whatever exception encountered during the last retry.
"""
n = 1
start_time = time.time()
delay = retry_params.delay(n, start_time)
if delay <= 0:
return
logging.info('Will retry request to %s.', url)
while delay > 0:
resp = None
try:
logging.info('Retry in %s seconds.', delay)
time.sleep(delay)
resp = urlfetch.fetch(url, **kwds)
except runtime.DeadlineExceededError:
logging.info(
'Urlfetch retry %s will exceed request deadline '
'after %s seconds total', n, time.time() - start_time)
raise
except _RETRIABLE_EXCEPTIONS, e:
pass
n += 1
delay = retry_params.delay(n, start_time)
if resp and not _should_retry(resp):
break
elif resp:
logging.info(
'Got status %s from GCS.', resp.status_code)
else:
logging.info(
'Got exception "%r" while contacting GCS.', e)
if resp:
return resp
logging.info('Urlfetch failed after %s retries and %s seconds in total.',
n - 1, time.time() - start_time)
raise
def _run_until_rpc():
"""Eagerly evaluate tasklets until it is blocking on some RPC.
Usually ndb eventloop el isn't run until some code calls future.get_result().
When an async tasklet is called, the tasklet wrapper evaluates the tasklet
code into a generator, enqueues a callback _help_tasklet_along onto
the el.current queue, and returns a future.
_help_tasklet_along, when called by the el, will
get one yielded value from the generator. If the value if another future,
set up a callback _on_future_complete to invoke _help_tasklet_along
when the dependent future fulfills. If the value if a RPC, set up a
callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
Thus _help_tasklet_along drills down
the chain of futures until some future is blocked by RPC. El runs
all callbacks and constantly check pending RPC status.
"""
el = eventloop.get_event_loop()
while el.current:
el.run0()
def _eager_tasklet(tasklet):
"""Decorator to turn tasklet to run eagerly."""
@utils.wrapping(tasklet)
def eager_wrapper(*args, **kwds):
fut = tasklet(*args, **kwds)
_run_until_rpc()
return fut
return eager_wrapper
|