File: method.py

package info (click to toggle)
python-google-api-core 2.25.1-1~exp1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 1,304 kB
  • sloc: python: 15,274; makefile: 3
file content (253 lines) | stat: -rw-r--r-- 9,494 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for wrapping low-level gRPC methods with common functionality.

This is used by gapic clients to provide common error mapping, retry, timeout,
compression, pagination, and long-running operations to gRPC methods.
"""

import enum
import functools

from google.api_core import grpc_helpers
from google.api_core.gapic_v1 import client_info
from google.api_core.timeout import TimeToDeadlineTimeout

USE_DEFAULT_METADATA = object()


class _MethodDefault(enum.Enum):
    # Uses enum so that pytype/mypy knows that this is the only possible value.
    # https://stackoverflow.com/a/60605919/101923
    #
    # Literal[_DEFAULT_VALUE] is an alternative, but only added in Python 3.8.
    # https://docs.python.org/3/library/typing.html#typing.Literal
    _DEFAULT_VALUE = object()


DEFAULT = _MethodDefault._DEFAULT_VALUE
"""Sentinel value indicating that a retry, timeout, or compression argument was unspecified,
so the default should be used."""


def _is_not_none_or_false(value):
    return value is not None and value is not False


def _apply_decorators(func, decorators):
    """Apply a list of decorators to a given function.

    ``decorators`` may contain items that are ``None`` or ``False`` which will
    be ignored.
    """
    filtered_decorators = filter(_is_not_none_or_false, reversed(decorators))

    for decorator in filtered_decorators:
        func = decorator(func)

    return func


class _GapicCallable(object):
    """Callable that applies retry, timeout, and metadata logic.

    Args:
        target (Callable): The low-level RPC method.
        retry (google.api_core.retry.Retry): The default retry for the
            callable. If ``None``, this callable will not retry by default
        timeout (google.api_core.timeout.Timeout): The default timeout for the
            callable (i.e. duration of time within which an RPC must terminate
            after its start, not to be confused with deadline). If ``None``,
            this callable will not specify a timeout argument to the low-level
            RPC method.
        compression (grpc.Compression): The default compression for the callable.
            If ``None``, this callable will not specify a compression argument
            to the low-level RPC method.
        metadata (Sequence[Tuple[str, str]]): Additional metadata that is
            provided to the RPC method on every invocation. This is merged with
            any metadata specified during invocation. If ``None``, no
            additional metadata will be passed to the RPC method.
    """

    def __init__(
        self,
        target,
        retry,
        timeout,
        compression,
        metadata=None,
    ):
        self._target = target
        self._retry = retry
        self._timeout = timeout
        self._compression = compression
        self._metadata = metadata

    def __call__(
        self, *args, timeout=DEFAULT, retry=DEFAULT, compression=DEFAULT, **kwargs
    ):
        """Invoke the low-level RPC with retry, timeout, compression, and metadata."""

        if retry is DEFAULT:
            retry = self._retry

        if timeout is DEFAULT:
            timeout = self._timeout

        if compression is DEFAULT:
            compression = self._compression

        if isinstance(timeout, (int, float)):
            timeout = TimeToDeadlineTimeout(timeout=timeout)

        # Apply all applicable decorators.
        wrapped_func = _apply_decorators(self._target, [retry, timeout])

        # Add the user agent metadata to the call.
        if self._metadata is not None:
            metadata = kwargs.get("metadata", [])
            # Due to the nature of invocation, None should be treated the same
            # as not specified.
            if metadata is None:
                metadata = []
            metadata = list(metadata)
            metadata.extend(self._metadata)
            kwargs["metadata"] = metadata
        if self._compression is not None:
            kwargs["compression"] = compression

        return wrapped_func(*args, **kwargs)


def wrap_method(
    func,
    default_retry=None,
    default_timeout=None,
    default_compression=None,
    client_info=client_info.DEFAULT_CLIENT_INFO,
    *,
    with_call=False,
):
    """Wrap an RPC method with common behavior.

    This applies common error wrapping, retry, timeout, and compression behavior to a function.
    The wrapped function will take optional ``retry``, ``timeout``, and ``compression``
    arguments.

    For example::

        import google.api_core.gapic_v1.method
        from google.api_core import retry
        from google.api_core import timeout
        from grpc import Compression

        # The original RPC method.
        def get_topic(name, timeout=None):
            request = publisher_v2.GetTopicRequest(name=name)
            return publisher_stub.GetTopic(request, timeout=timeout)

        default_retry = retry.Retry(deadline=60)
        default_timeout = timeout.Timeout(deadline=60)
        default_compression = Compression.NoCompression
        wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
            get_topic, default_retry)

        # Execute get_topic with default retry and timeout:
        response = wrapped_get_topic()

        # Execute get_topic without doing any retying but with the default
        # timeout:
        response = wrapped_get_topic(retry=None)

        # Execute get_topic but only retry on 5xx errors:
        my_retry = retry.Retry(retry.if_exception_type(
            exceptions.InternalServerError))
        response = wrapped_get_topic(retry=my_retry)

    The way this works is by late-wrapping the given function with the retry
    and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
    called:

    * ``get_topic()`` is first wrapped with the ``timeout`` into
      ``get_topic_with_timeout``.
    * ``get_topic_with_timeout`` is wrapped with the ``retry`` into
      ``get_topic_with_timeout_and_retry()``.
    * The final ``get_topic_with_timeout_and_retry`` is called passing through
      the ``args``  and ``kwargs``.

    The callstack is therefore::

        method.__call__() ->
            Retry.__call__() ->
                Timeout.__call__() ->
                    wrap_errors() ->
                        get_topic()

    Note that if ``timeout`` or ``retry`` is ``None``, then they are not
    applied to the function. For example,
    ``wrapped_get_topic(timeout=None, retry=None)`` is more or less
    equivalent to just calling ``get_topic`` but with error re-mapping.

    Args:
        func (Callable[Any]): The function to wrap. It should accept an
            optional ``timeout`` argument. If ``metadata`` is not ``None``, it
            should accept a ``metadata`` argument.
        default_retry (Optional[google.api_core.Retry]): The default retry
            strategy. If ``None``, the method will not retry by default.
        default_timeout (Optional[google.api_core.Timeout]): The default
            timeout strategy. Can also be specified as an int or float. If
            ``None``, the method will not have timeout specified by default.
        default_compression (Optional[grpc.Compression]): The default
            grpc.Compression. If ``None``, the method will not have
            compression specified by default.
        client_info
            (Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
                Client information used to create a user-agent string that's
                passed as gRPC metadata to the method. If unspecified, then
                a sane default will be used. If ``None``, then no user agent
                metadata will be provided to the RPC method.
        with_call (bool): If True, wrapped grpc.UnaryUnaryMulticallables will
            return a tuple of (response, grpc.Call) instead of just the response.
            This is useful for extracting trailing metadata from unary calls.
            Defaults to False.

    Returns:
        Callable: A new callable that takes optional ``retry``, ``timeout``,
            and ``compression``
            arguments and applies the common error mapping, retry, timeout, compression,
            and metadata behavior to the low-level RPC method.
    """
    if with_call:
        try:
            func = func.with_call
        except AttributeError as exc:
            raise ValueError(
                "with_call=True is only supported for unary calls."
            ) from exc
    func = grpc_helpers.wrap_errors(func)
    if client_info is not None:
        user_agent_metadata = [client_info.to_grpc_metadata()]
    else:
        user_agent_metadata = None

    return functools.wraps(func)(
        _GapicCallable(
            func,
            default_retry,
            default_timeout,
            default_compression,
            metadata=user_agent_metadata,
        )
    )