File: grpc.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (282 lines) | stat: -rw-r--r-- 8,815 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# frozen_string_literal: true

module HTTPX
  GRPCError = Class.new(Error) do
    attr_reader :status, :details, :metadata

    def initialize(status, details, metadata)
      @status = status
      @details = details
      @metadata = metadata
      super("GRPC error, code=#{status}, details=#{details}, metadata=#{metadata}")
    end
  end

  module Plugins
    #
    # This plugin adds DSL to build GRPC interfaces.
    #
    # https://gitlab.com/os85/httpx/wikis/GRPC
    #
    module GRPC
      unless String.method_defined?(:underscore)
        module StringExtensions
          refine String do
            def underscore
              s = dup # Avoid mutating the argument, as it might be frozen.
              s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
              s.gsub!(/([a-z\d])([A-Z])/, '\1_\2')
              s.tr!("-", "_")
              s.downcase!
              s
            end
          end
        end
        using StringExtensions
      end

      DEADLINE = 60
      MARSHAL_METHOD = :encode
      UNMARSHAL_METHOD = :decode
      HEADERS = {
        "content-type" => "application/grpc",
        "te" => "trailers",
        "accept" => "application/grpc",
        # metadata fits here
        # ex "foo-bin" => base64("bar")
      }.freeze

      class << self
        def load_dependencies(*)
          require "stringio"
          require "httpx/plugins/grpc/grpc_encoding"
          require "httpx/plugins/grpc/message"
          require "httpx/plugins/grpc/call"
        end

        def configure(klass)
          klass.plugin(:persistent)
          klass.plugin(:stream)
        end

        def extra_options(options)
          options.merge(
            fallback_protocol: "h2",
            grpc_rpcs: {}.freeze,
            grpc_compression: false,
            grpc_deadline: DEADLINE
          )
        end
      end

      module OptionsMethods
        private

        def option_grpc_service(value)
          String(value)
        end

        def option_grpc_compression(value)
          case value
          when true, false
            value
          else
            value.to_s
          end
        end

        def option_grpc_rpcs(value)
          Hash[value]
        end

        def option_grpc_deadline(value)
          raise TypeError, ":grpc_deadline must be positive" unless value.positive?

          value
        end

        def option_call_credentials(value)
          raise TypeError, ":call_credentials must respond to #call" unless value.respond_to?(:call)

          value
        end
      end

      module ResponseMethods
        attr_reader :trailing_metadata

        def merge_headers(trailers)
          @trailing_metadata = Hash[trailers]
          super
        end
      end

      module RequestBodyMethods
        def initialize(*, **)
          super

          if (compression = @headers["grpc-encoding"])
            deflater_body = self.class.initialize_deflater_body(@body, compression)
            @body = Transcoder::GRPCEncoding.encode(deflater_body || @body, compressed: !deflater_body.nil?)
          else
            @body = Transcoder::GRPCEncoding.encode(@body, compressed: false)
          end
        end
      end

      module InstanceMethods
        def with_channel_credentials(ca_path, key = nil, cert = nil, **ssl_opts)
          # @type var ssl_params: ::Hash[::Symbol, untyped]
          ssl_params = {
            **ssl_opts,
            ca_file: ca_path,
          }
          if key
            key = File.read(key) if File.file?(key)
            ssl_params[:key] = OpenSSL::PKey.read(key)
          end

          if cert
            cert = File.read(cert) if File.file?(cert)
            ssl_params[:cert] = OpenSSL::X509::Certificate.new(cert)
          end

          with(ssl: ssl_params)
        end

        def rpc(rpc_name, input, output, **opts)
          rpc_name = rpc_name.to_s
          raise Error, "rpc #{rpc_name} already defined" if @options.grpc_rpcs.key?(rpc_name)

          rpc_opts = {
            deadline: @options.grpc_deadline,
          }.merge(opts)

          local_rpc_name = rpc_name.underscore

          session_class = Class.new(self.class) do
            # define rpc method with ruby style name
            class_eval(<<-OUT, __FILE__, __LINE__ + 1)
              def #{local_rpc_name}(input, **opts)              # def grpc_action(input, **opts)
                rpc_execute("#{local_rpc_name}", input, **opts) #   rpc_execute("grpc_action", input, **opts)
              end                                               # end
            OUT

            # define rpc method with original name
            unless local_rpc_name == rpc_name
              class_eval(<<-OUT, __FILE__, __LINE__ + 1)
                def #{rpc_name}(input, **opts)                    # def grpcAction(input, **opts)
                  rpc_execute("#{local_rpc_name}", input, **opts) #   rpc_execute("grpc_action", input, **opts)
                end                                               # end
              OUT
            end
          end

          session_class.new(@options.merge(
                              grpc_rpcs: @options.grpc_rpcs.merge(
                                local_rpc_name => [rpc_name, input, output, rpc_opts]
                              ).freeze
                            ))
        end

        def build_stub(origin, service: nil, compression: false)
          scheme = @options.ssl.empty? ? "http" : "https"

          origin = URI.parse("#{scheme}://#{origin}")

          session = self

          if service && service.respond_to?(:rpc_descs)
            # it's a grpc generic service
            service.rpc_descs.each do |rpc_name, rpc_desc|
              rpc_opts = {
                marshal_method: rpc_desc.marshal_method,
                unmarshal_method: rpc_desc.unmarshal_method,
              }

              input = rpc_desc.input
              input = input.type if input.respond_to?(:type)

              output = rpc_desc.output
              if output.respond_to?(:type)
                rpc_opts[:stream] = true
                output = output.type
              end

              session = session.rpc(rpc_name, input, output, **rpc_opts)
            end

            service = service.service_name
          end

          session.with(origin: origin, grpc_service: service, grpc_compression: compression)
        end

        def execute(rpc_method, input,
                    deadline: DEADLINE,
                    metadata: nil,
                    **opts)
          grpc_request = build_grpc_request(rpc_method, input, deadline: deadline, metadata: metadata, **opts)
          response = request(grpc_request, **opts)
          response.raise_for_status unless opts[:stream]
          GRPC::Call.new(response)
        end

        private

        def rpc_execute(rpc_name, input, **opts)
          rpc_name, input_enc, output_enc, rpc_opts = @options.grpc_rpcs[rpc_name]

          exec_opts = rpc_opts.merge(opts)

          marshal_method ||= exec_opts.delete(:marshal_method) || MARSHAL_METHOD
          unmarshal_method ||= exec_opts.delete(:unmarshal_method) || UNMARSHAL_METHOD

          messages = if input.respond_to?(:each)
            Enumerator.new do |y|
              input.each do |message|
                y << input_enc.__send__(marshal_method, message)
              end
            end
          else
            input_enc.__send__(marshal_method, input)
          end

          call = execute(rpc_name, messages, **exec_opts)

          call.decoder = output_enc.method(unmarshal_method)

          call
        end

        def build_grpc_request(rpc_method, input, deadline:, metadata: nil, **opts)
          uri = @options.origin.dup
          rpc_method = "/#{rpc_method}" unless rpc_method.start_with?("/")
          rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service
          uri.path = rpc_method

          headers = HEADERS.merge(
            "grpc-accept-encoding" => ["identity", *@options.supported_compression_formats]
          )
          unless deadline == Float::INFINITY
            # convert to milliseconds
            deadline = (deadline * 1000.0).to_i
            headers["grpc-timeout"] = "#{deadline}m"
          end

          headers = headers.merge(metadata.transform_keys(&:to_s)) if metadata

          # prepare compressor
          compression = @options.grpc_compression == true ? "gzip" : @options.grpc_compression

          headers["grpc-encoding"] = compression if compression

          headers.merge!(@options.call_credentials.call.transform_keys(&:to_s)) if @options.call_credentials

          build_request("POST", uri, headers: headers, body: input, **opts)
        end
      end
    end
    register_plugin :grpc, GRPC
  end
end