File: client_stub.rb

package info (click to toggle)
ruby-gapic-common 1.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 392 kB
  • sloc: ruby: 2,081; makefile: 4
file content (329 lines) | stat: -rw-r--r-- 14,109 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "googleauth"
require "gapic/logging_concerns"
require "gapic/rest/faraday_middleware"
require "gapic/universe_domain_concerns"
require "faraday/retry"

module Gapic
  module Rest
    ##
    # A class for making REST calls through Faraday
    # ClientStub's responsibilities:
    #   - wrap Faraday methods with a bounded explicit interface
    #   - store service endpoint and create full url for the request
    #   - store credentials and add auth information to the request
    #
    class ClientStub
      include UniverseDomainConcerns
      include LoggingConcerns

      ##
      # Initializes with an endpoint and credentials
      #
      # @param endpoint [String] The endpoint of the API. Overrides any endpoint_template.
      # @param endpoint_template [String] The endpoint of the API, where the
      #   universe domain component of the hostname is marked by the string in
      #   the constant {UniverseDomainConcerns::ENDPOINT_SUBSTITUTION}.
      # @param universe_domain [String] The universe domain in which calls
      #   should be made. Defaults to `googleapis.com`.
      # @param credentials [Google::Auth::Credentials]
      #   Credentials to send with calls in form of a googleauth credentials object.
      #   (see the [googleauth docs](https://googleapis.dev/ruby/googleauth/latest/index.html))
      # @param numeric_enums [Boolean] Whether to signal the server to JSON-encode enums as ints
      # @param raise_faraday_errors [Boolean]
      #   Whether to raise Faraday errors instead of wrapping them in `Gapic::Rest::Error`
      #   Added for backwards compatibility.
      #   Default is `true`. All REST clients (except for old versions of `google-cloud-compute-v1`)
      #   should explicitly set this parameter to `false`.
      # @param logger [Logger,:default,nil] An explicit logger to use, or one
      #   of the values `:default` (the default) to construct a default logger,
      #   or `nil` to disable logging explicitly.
      #
      # @yield [Faraday::Connection]
      #
      def initialize credentials:,
                     endpoint: nil,
                     endpoint_template: nil,
                     universe_domain: nil,
                     numeric_enums: false,
                     raise_faraday_errors: true,
                     logging_system: nil,
                     service_name: nil,
                     logger: :default
        setup_universe_domain universe_domain: universe_domain,
                              endpoint: endpoint,
                              endpoint_template: endpoint_template,
                              credentials: credentials

        endpoint_url = self.endpoint
        endpoint_url = "https://#{endpoint_url}" unless /^https?:/.match? endpoint_url
        endpoint_url = endpoint_url.sub %r{/$}, ""

        setup_logging logger: logger,
                      system_name: logging_system,
                      service: service_name,
                      endpoint: endpoint_url,
                      client_id: object_id

        @numeric_enums = numeric_enums

        @raise_faraday_errors = raise_faraday_errors

        @connection = Faraday.new url: endpoint_url do |conn|
          conn.headers = { "Content-Type" => "application/json" }
          conn.request :google_authorization, self.credentials unless self.credentials.is_a? ::Symbol
          conn.request :retry
          conn.response :raise_error
          conn.adapter :net_http
        end

        yield @connection if block_given?
      end

      ##
      # Makes a GET request
      #
      # @param uri [String] uri to send this request to
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied
      #     to the REST call. Currently only timeout and headers are supported.
      # @return [Faraday::Response]
      def make_get_request uri:, params: {}, options: {}, method_name: nil
        make_http_request :get, uri: uri, body: nil, params: params, options: options, method_name: method_name
      end

      ##
      # Makes a DELETE request
      #
      # @param uri [String] uri to send this request to
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied
      #     to the REST call. Currently only timeout and headers are supported.
      # @return [Faraday::Response]
      def make_delete_request uri:, params: {}, options: {}, method_name: nil
        make_http_request :delete, uri: uri, body: nil, params: params, options: options, method_name: method_name
      end

      ##
      # Makes a PATCH request
      #
      # @param uri [String] uri to send this request to
      # @param body [String] a body to send with the request, nil for requests without a body
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied
      #     to the REST call. Currently only timeout and headers are supported.
      # @return [Faraday::Response]
      def make_patch_request uri:, body:, params: {}, options: {}, method_name: nil
        make_http_request :patch, uri: uri, body: body, params: params, options: options, method_name: method_name
      end

      ##
      # Makes a POST request
      #
      # @param uri [String] uri to send this request to
      # @param body [String] a body to send with the request, nil for requests without a body
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied
      #     to the REST call. Currently only timeout and headers are supported.
      # @return [Faraday::Response]
      def make_post_request uri:, body: nil, params: {}, options: {}, method_name: nil
        make_http_request :post, uri: uri, body: body, params: params, options: options, method_name: method_name
      end

      ##
      # Makes a PUT request
      #
      # @param uri [String] uri to send this request to
      # @param body [String] a body to send with the request, nil for requests without a body
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied
      #     to the REST call. Currently only timeout and headers are supported.
      # @return [Faraday::Response]
      def make_put_request uri:, body: nil, params: {}, options: {}, method_name: nil
        make_http_request :put, uri: uri, body: body, params: params, options: options, method_name: method_name
      end

      ##
      # @private
      # Sends a http request via Faraday
      # @param verb [Symbol] http verb
      # @param uri [String] uri to send this request to
      # @param body [String, nil] a body to send with the request, nil for requests without a body
      # @param params [Hash] query string parameters for the request
      # @param options [::Gapic::CallOptions,Hash] gapic options to be applied to the REST call.
      # @param is_server_streaming [Boolean] flag if method is streaming
      # @yieldparam chunk [String] The chunk of data received during server streaming.
      # @return [Faraday::Response]
      def make_http_request verb,
                            uri:, body:, params:, options:,
                            is_server_streaming: false, method_name: nil,
                            &block
        # Converts hash and nil to an options object
        options = ::Gapic::CallOptions.new(**options.to_h) unless options.is_a? ::Gapic::CallOptions
        deadline = calculate_deadline options
        retried_exception = nil
        next_timeout = get_timeout deadline
        request_id = LoggingConcerns.random_uuid4
        try_number = 1

        begin
          log_request method_name, request_id, try_number, body, options.metadata
          response = base_make_http_request verb: verb, uri: uri, body: body,
                                            params: params, metadata: options.metadata,
                                            timeout: next_timeout,
                                            is_server_streaming: is_server_streaming,
                                            &block
          log_response method_name, request_id, try_number, response, is_server_streaming
          response
        rescue ::Faraday::TimeoutError => e
          log_response method_name, request_id, try_number, e, is_server_streaming
          raise if @raise_faraday_errors
          raise Gapic::Rest::DeadlineExceededError.wrap_faraday_error e, root_cause: retried_exception
        rescue ::Faraday::Error => e
          log_response method_name, request_id, try_number, e, is_server_streaming
          next_timeout = get_timeout deadline

          if check_retry?(next_timeout) && options.retry_policy.call(e)
            retried_exception = e
            try_number += 1
            retry
          end

          raise if @raise_faraday_errors
          raise ::Gapic::Rest::Error.wrap_faraday_error e
        end
      end

      ##
      # @private
      # Sends a http request via Faraday
      #
      # @param verb [Symbol] http verb
      # @param uri [String] uri to send this request to
      # @param body [String, nil] a body to send with the request, nil for requests without a body
      # @param params [Hash] query string parameters for the request
      # @param metadata [Hash] additional headers for the request
      # @param is_server_streaming [Boolean] flag if method is streaming
      # @yieldparam chunk [String] The chunk of data received during server streaming.
      # @return [Faraday::Response]
      def base_make_http_request verb:, uri:, body:, params:, metadata:,
                                 timeout:, is_server_streaming: false
        if @numeric_enums && (!params.key?("$alt") || params["$alt"] == "json")
          params = params.merge({ "$alt" => "json;enum-encoding=int" })
        end

        @connection.send verb, uri do |req|
          req.params = params if params.any?
          req.body = body unless body.nil?
          req.headers = req.headers.merge metadata
          req.options.timeout = timeout if timeout&.positive?
          if is_server_streaming
            req.options.on_data = proc do |chunk, _overall_received_bytes|
              yield chunk
            end
          end
        end
      end

      private

      ##
      # Calculates deadline
      #
      # @param options [Gapic::CallOptions] call options for this call
      #
      # @return [Numeric, nil] Deadline against a POSIX clock_gettime()
      def calculate_deadline options
        return if options.timeout.nil?
        return if options.timeout.negative?

        Process.clock_gettime(Process::CLOCK_MONOTONIC) + options.timeout
      end

      ##
      # Calculates timeout (seconds) to use as a Faraday timeout
      #
      # @param deadline [Numeric, nil] deadline
      #
      # @return [Numeric, nil] Timeout (seconds)
      def get_timeout deadline
        return if deadline.nil?
        deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      end

      ##
      # Whether the timeout should be retried
      #
      # @param timeout [Numeric, nil]
      #
      # @return [Boolean] whether the timeout should be retried
      def check_retry? timeout
        return true if timeout.nil?

        timeout.positive?
      end

      def log_request method_name, request_id, try_number, body, metadata
        return unless stub_logger&.enabled?
        stub_logger.info do |entry|
          entry.set_system_name
          entry.set_service
          entry.set "rpcName", method_name
          entry.set "retryAttempt", try_number
          entry.set "requestId", request_id
          entry.message = "Sending request to #{entry.service}.#{method_name} (try #{try_number})"
        end
        body = body.to_s
        metadata = metadata.to_h rescue {}
        return if body.empty? && metadata.empty?
        stub_logger.debug do |entry|
          entry.set "requestId", request_id
          entry.set "request", body
          entry.set "headers", metadata
          entry.message = "(request payload as JSON)"
        end
      end

      def log_response method_name, request_id, try_number, response, is_server_streaming
        return unless stub_logger&.enabled?
        stub_logger.info do |entry|
          entry.set_system_name
          entry.set_service
          entry.set "rpcName", method_name
          entry.set "retryAttempt", try_number
          entry.set "requestId", request_id
          if response.is_a? StandardError
            entry.set "exception", response.to_s
            entry.message = "Received error for #{entry.service}.#{method_name} (try #{try_number}): #{response}"
          elsif is_server_streaming
            entry.message = "Receiving stream for #{entry.service}.#{method_name} (try #{try_number})"
          else
            entry.message = "Received response for #{entry.service}.#{method_name} (try #{try_number})"
          end
        end
        return if is_server_streaming || !response.respond_to?(:body)
        body = response.body.to_s
        return if body.empty?
        stub_logger.debug do |entry|
          entry.set "requestId", request_id
          entry.set "response", body
          entry.message = "(response payload as JSON)"
        end
      end
    end
  end
end