
|
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for wrapping low-level gRPC methods with common functionality.
This is used by gapic clients to provide common error mapping, retry, timeout,
compression, pagination, and long-running operations to gRPC methods.
"""
import enum
import functools
from google.api_core import grpc_helpers
from google.api_core.gapic_v1 import client_info
from google.api_core.timeout import TimeToDeadlineTimeout
USE_DEFAULT_METADATA = object()
class _MethodDefault(enum.Enum):
# Uses enum so that pytype/mypy knows that this is the only possible value.
# https://stackoverflow.com/a/60605919/101923
#
# Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8.
# https://docs.python.org/3/library/typing.html#typing.Literal
_DEFAULT_VALUE = object()
DEFAULT = _MethodDefault._DEFAULT_VALUE
"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
so the default should be used."""
def _is_not_none_or_false(value):
return value is not None and value is not False
def _apply_decorators(func, decorators):
"""Apply a list of decorators to a given function.
``decorators`` may contain items that are ``None`` or ``False`` which will
be ignored.
"""
filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))
for decorator in filtered_decorators:
func = decorator(func)
return func
class _GapicCallable(object):
"""Callable that applies retry, timeout, and metadata logic.
Args:
target (Callable): The low-level RPC method.
retry (google.api_core.retry.Retry): The default retry for the
callable. If ``None``, this callable will not retry by default
timeout (google.api_core.timeout.Timeout): The default timeout for the
callable (i.e. duration of time within which an RPC must terminate
after its start, not to be confused with deadline). If ``None``,
this callable will not specify a timeout argument to the low-level
RPC method.
compression (grpc.Compression): The default compression for the callable.
If ``None``, this callable will not specify a compression argument
to the low-level RPC method.
metadata (Sequence[Tuple[str, str]]): Additional metadata that is
provided to the RPC method on every invocation. This is merged with
any metadata specified during invocation. If ``None``, no
additional metadata will be passed to the RPC method.
"""
def __init__(
self,
target,
retry,
timeout,
compression,
metadata=None,
):
self._target = target
self._retry = retry
self._timeout = timeout
self._compression = compression
self._metadata = metadata
def __call__(
self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
):
"""Invoke the low-level RPC with retry, timeout, compression, and metadata."""
if retry is DEFAULT:
retry = self._retry
if timeout is DEFAULT:
timeout = self._timeout
if compression is DEFAULT:
compression = self._compression
if isinstance(timeout, (int, float)):
timeout = TimeToDeadlineTimeout(timeout=timeout)
# Apply all applicable decorators.
wrapped_func = _apply_decorators(self._target, [retry, timeout])
# Add the user agent metadata to the call.
if self._metadata is not None:
metadata = kwargs.get("metadata", [])
# Due to the nature of invocation, None should be treated the same
# as not specified.
if metadata is None:
metadata = []
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
if self._compression is not None:
kwargs["compression"] = compression
return wrapped_func(*args, **kwargs)
def wrap_method(
func,
default_retry=None,
default_timeout=None,
default_compression=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
*,
with_call=False,
):
"""Wrap an RPC method with common behavior.
This applies common error wrapping, retry, timeout, and compression behavior to a function.
The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
arguments.
For example::
import google.api_core.gapic_v1.method
from google.api_core import retry
from google.api_core import timeout
from grpc import Compression
# The original RPC method.
def get_topic(name, timeout=None):
request = publisher_v2.GetTopicRequest(name=name)
return publisher_stub.GetTopic(request, timeout=timeout)
default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
default_compression = Compression.NoCompression
wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
get_topic, default_retry)
# Execute get_topic with default retry and timeout:
response = wrapped_get_topic()
# Execute get_topic without doing any retying but with the default
# timeout:
response = wrapped_get_topic(retry=None)
# Execute get_topic but only retry on 5xx errors:
my_retry = retry.Retry(retry.if_exception_type(
exceptions.InternalServerError))
response = wrapped_get_topic(retry=my_retry)
The way this works is by late-wrapping the given function with the retry
and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
called:
* ``get_topic()`` is first wrapped with the ``timeout`` into
``get_topic_with_timeout``.
* ``get_topic_with_timeout`` is wrapped with the ``retry`` into
``get_topic_with_timeout_and_retry()``.
* The final ``get_topic_with_timeout_and_retry`` is called passing through
the ``args`` and ``kwargs``.
The callstack is therefore::
method.__call__() ->
Retry.__call__() ->
Timeout.__call__() ->
wrap_errors() ->
get_topic()
Note that if ``timeout`` or ``retry`` is ``None``, then they are not
applied to the function. For example,
``wrapped_get_topic(timeout=None, retry=None)`` is more or less
equivalent to just calling ``get_topic`` but with error re-mapping.
Args:
func (Callable[Any]): The function to wrap. It should accept an
optional ``timeout`` argument. If ``metadata`` is not ``None``, it
should accept a ``metadata`` argument.
default_retry (Optional[google.api_core.Retry]): The default retry
strategy. If ``None``, the method will not retry by default.
default_timeout (Optional[google.api_core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
default_compression (Optional[grpc.Compression]): The default
grpc.Compression. If ``None``, the method will not have
compression specified by default.
client_info
(Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
Client information used to create a user-agent string that's
passed as gRPC metadata to the method. If unspecified, then
a sane default will be used. If ``None``, then no user agent
metadata will be provided to the RPC method.
with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will
return a tuple of (response, grpc.Call) instead of just the response.
This is useful for extracting trailing metadata from unary calls.
Defaults to False.
Returns:
Callable: A new callable that takes optional ``retry``, ``timeout``,
and ``compression``
arguments and applies the common error mapping, retry, timeout, compression,
and metadata behavior to the low-level RPC method.
"""
if with_call:
try:
func = func.with_call
except AttributeError as exc:
raise ValueError(
"with_call=True is only supported for unary calls."
) from exc
func = grpc_helpers.wrap_errors(func)
if client_info is not None:
user_agent_metadata = [client_info.to_grpc_metadata()]
else:
user_agent_metadata = None
return functools.wraps(func)(
_GapicCallable(
func,
default_retry,
default_timeout,
default_compression,
metadata=user_agent_metadata,
)
)
|