1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
|
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for wrapping low-level gRPC methods with common functionality.
This is used by gapic clients to provide common error mapping, retry, timeout,
compression, pagination, and long-running operations to gRPC methods.
"""
import enum
import functools
from google.api_core import grpc_helpers
from google.api_core.gapic_v1 import client_info
from google.api_core.timeout import TimeToDeadlineTimeout
USE_DEFAULT_METADATA = object()
class _MethodDefault(enum.Enum):
# Uses enum so that pytype/mypy knows that this is the only possible value.
# https://stackoverflow.com/a/60605919/101923
#
# Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8.
# https://docs.python.org/3/library/typing.html#typing.Literal
_DEFAULT_VALUE = object()
DEFAULT = _MethodDefault._DEFAULT_VALUE
"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
so the default should be used."""
def _is_not_none_or_false(value):
return value is not None and value is not False
def _apply_decorators(func, decorators):
"""Apply a list of decorators to a given function.
``decorators`` may contain items that are ``None`` or ``False`` which will
be ignored.
"""
filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))
for decorator in filtered_decorators:
func = decorator(func)
return func
class _GapicCallable(object):
"""Callable that applies retry, timeout, and metadata logic.
Args:
target (Callable): The low-level RPC method.
retry (google.api_core.retry.Retry): The default retry for the
callable. If ``None``, this callable will not retry by default
timeout (google.api_core.timeout.Timeout): The default timeout for the
callable (i.e. duration of time within which an RPC must terminate
after its start, not to be confused with deadline). If ``None``,
this callable will not specify a timeout argument to the low-level
RPC method.
compression (grpc.Compression): The default compression for the callable.
If ``None``, this callable will not specify a compression argument
to the low-level RPC method.
metadata (Sequence[Tuple[str, str]]): Additional metadata that is
provided to the RPC method on every invocation. This is merged with
any metadata specified during invocation. If ``None``, no
additional metadata will be passed to the RPC method.
"""
def __init__(
self,
target,
retry,
timeout,
compression,
metadata=None,
):
self._target = target
self._retry = retry
self._timeout = timeout
self._compression = compression
self._metadata = metadata
def __call__(
self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
):
"""Invoke the low-level RPC with retry, timeout, compression, and metadata."""
if retry is DEFAULT:
retry = self._retry
if timeout is DEFAULT:
timeout = self._timeout
if compression is DEFAULT:
compression = self._compression
if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)
# Apply all applicable decorators.
wrapped_func = _apply_decorators(self._target, [retry, timeout])
# Add the user agent metadata to the call.
if self._metadata is not None:
metadata = kwargs.get("metadata", [])
# Due to the nature of invocation, None should be treated the same
# as not specified.
if metadata is None:
metadata = []
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
if self._compression is not None:
kwargs["compression"] = compression
return wrapped_func(*args, **kwargs)
def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
*,
with_call=False,
):
"""Wrap an RPC method with common behavior.
This applies common error wrapping, retry, timeout, and compression behavior to a function.
The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
arguments.
For example::
import google.api_core.gapic_v1.method
from google.api_core import retry
from google.api_core import timeout
from grpc import Compression
# The original RPC method.
def get_topic(name, timeout=None):
request = publisher_v2.GetTopicRequest(name=name)
return publisher_stub.GetTopic(request, timeout=timeout)
default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
default_compression = Compression.NoCompression
wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
get_topic, default_retry)
# Execute get_topic with default retry and timeout:
response = wrapped_get_topic()
# Execute get_topic without doing any retying but with the default
# timeout:
response = wrapped_get_topic(retry=None)
# Execute get_topic but only retry on 5xx errors:
my_retry = retry.Retry(retry.if_exception_type(
exceptions.InternalServerError))
response = wrapped_get_topic(retry=my_retry)
The way this works is by late-wrapping the given function with the retry
and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
called:
* ``get_topic()`` is first wrapped with the ``timeout`` into
``get_topic_with_timeout``.
* ``get_topic_with_timeout`` is wrapped with the ``retry`` into
``get_topic_with_timeout_and_retry()``.
* The final ``get_topic_with_timeout_and_retry`` is called passing through
the ``args`` and ``kwargs``.
The callstack is therefore::
method.__call__() ->
Retry.__call__() ->
Timeout.__call__() ->
wrap_errors() ->
get_topic()
Note that if ``timeout`` or ``retry`` is ``None``, then they are not
applied to the function. For example,
``wrapped_get_topic(timeout=None, retry=None)`` is more or less
equivalent to just calling ``get_topic`` but with error re-mapping.
Args:
func (Callable[Any]): The function to wrap. It should accept an
optional ``timeout`` argument. If ``metadata`` is not ``None``, it
should accept a ``metadata`` argument.
default_retry (Optional[google.api_core.Retry]): The default retry
strategy. If ``None``, the method will not retry by default.
default_timeout (Optional[google.api_core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
default_compression (Optional[grpc.Compression]): The default
grpc.Compression. If ``None``, the method will not have
compression specified by default.
client_info
(Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
Client information used to create a user-agent string that's
passed as gRPC metadata to the method. If unspecified, then
a sane default will be used. If ``None``, then no user agent
metadata will be provided to the RPC method.
with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will
return a tuple of (response, grpc.Call) instead of just the response.
This is useful for extracting trailing metadata from unary calls.
Defaults to False.
Returns:
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression``
arguments and applies the common error mapping, retry, timeout, compression,
and metadata behavior to the low-level RPC method.
"""
if with_call:
try:
func = func.with_call
except AttributeError as exc:
raise ValueError(
"with_call=True is only supported for unary calls."
) from exc
func = grpc_helpers.wrap_errors(func)
if client_info is not None:
user_agent_metadata = [client_info.to_grpc_metadata()]
else:
user_agent_metadata = None
return functools.wraps(func)(
_GapicCallable(
func,
default_retry,
default_timeout,
default_compression,
metadata=user_agent_metadata,
)
)
|