File: stream_bidi.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (402 lines) | stat: -rw-r--r-- 10,443 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# frozen_string_literal: true

module HTTPX
  module Plugins
    #
    # This plugin adds support for bidirectional HTTP/2 streams.
    #
    # https://gitlab.com/os85/httpx/wikis/StreamBidi
    #
    # It is required that the request body allows chunk to be buffered, (i.e., responds to +#<<(chunk)+).
    module StreamBidi
      # Extension of the Connection::HTTP2 class, which adds functionality to
      # deal with a request that can't be drained and must be interleaved with
      # the response streams.
      #
      # The streams keeps send DATA frames while there's data; when they're ain't,
      # the stream is kept open; it must be explicitly closed by the end user.
      #
      module HTTP2Methods
        def initialize(*)
          super
          @lock = Thread::Mutex.new
        end

        %i[close empty? exhausted? send <<].each do |lock_meth|
          class_eval(<<-METH, __FILE__, __LINE__ + 1)
            # lock.aware version of +#{lock_meth}+
            def #{lock_meth}(*)                # def close(*)
              return super unless @options.stream

              return super if @lock.owned?

              # small race condition between
              # checking for ownership and
              # acquiring lock.
              # TODO: fix this at the parser.
              @lock.synchronize { super }
            end
          METH
        end

        private

        %i[join_headers join_trailers join_body].each do |lock_meth|
          class_eval(<<-METH, __FILE__, __LINE__ + 1)
            # lock.aware version of +#{lock_meth}+
            private def #{lock_meth}(*)                # private def join_headers(*)
              return super unless @options.stream

              return super if @lock.owned?

              # small race condition between
              # checking for ownership and
              # acquiring lock.
              # TODO: fix this at the parser.
              @lock.synchronize { super }
            end
          METH
        end

        def handle_stream(stream, request)
          return super unless @options.stream

          request.flush_buffer_on_body do
            next unless request.headers_sent

            handle(request, stream)

            emit(:flush_buffer)
          end
          super
        end

        # when there ain't more chunks, it makes the buffer as full.
        def send_chunk(request, stream, chunk, next_chunk)
          return super unless @options.stream

          super

          return if next_chunk

          request.transition(:waiting_for_chunk)
          throw(:buffer_full)
        end

        # sets end-stream flag when the request is closed.
        def end_stream?(request, next_chunk)
          return super unless @options.stream

          request.closed? && next_chunk.nil?
        end
      end

      # BidiBuffer is a thread-safe Buffer which can receive data from any thread.
      #
      # It uses a dual-buffer strategy with mutex protection:
      # - +@buffer+ is the main buffer, protected by +@buffer_mutex+
      # - +@oob_buffer+ receives data when +@buffer_mutex+ is contended
      #
      # This allows non-blocking writes from any thread while maintaining thread safety.
      class BidiBuffer < Buffer
        def initialize(*)
          super
          @buffer_mutex = Thread::Mutex.new
          @oob_mutex = Thread::Mutex.new
          @oob_buffer = "".b
        end

        # buffers the +chunk+ to be sent (thread-safe, non-blocking)
        def <<(chunk)
          if @buffer_mutex.try_lock
            begin
              super
            ensure
              @buffer_mutex.unlock
            end
          else
            # another thread holds the lock, use OOB buffer to avoid blocking
            @oob_mutex.synchronize { @oob_buffer << chunk }
          end
        end

        # reconciles the main and secondary buffer (thread-safe, callable from any thread).
        def rebuffer
          @buffer_mutex.synchronize do
            @oob_mutex.synchronize do
              return if @oob_buffer.empty?

              @buffer << @oob_buffer
              @oob_buffer.clear
            end
          end
        end

        Buffer.instance_methods - Object.instance_methods - %i[<<].each do |meth|
          class_eval(<<-MOD, __FILE__, __LINE__ + 1)
            def #{meth}(*) # def empty?
              @buffer_mutex.synchronize { super }
            end
          MOD
        end
      end

      # Proxy to wake up the session main loop when one
      # of the connections has buffered data to write. It abides by the HTTPX::_Selectable API,
      # which allows it to be registered in the selector alongside actual HTTP-based
      # HTTPX::Connection objects.
      class Signal
        attr_reader :error

        def initialize
          @closed = false
          @error = nil
          @pipe_read, @pipe_write = IO.pipe
        end

        def state
          @closed ? :closed : :open
        end

        # noop
        def log(**, &_); end

        def to_io
          @pipe_read.to_io
        end

        def wakeup
          return if @closed

          @pipe_write.write("\0")
        end

        def call
          return if @closed

          @pipe_read.readpartial(1)
        end

        def interests
          return if @closed

          :r
        end

        def timeout; end

        def inflight?
          !@closed
        end

        def terminate
          return if @closed

          @pipe_write.close
          @pipe_read.close
          @closed = true
        end

        def on_error(error)
          @error = error
          terminate
        end

        # noop (the owner connection will take of it)
        def handle_socket_timeout(interval); end
      end

      class << self
        def load_dependencies(klass)
          klass.plugin(:stream)
        end

        def extra_options(options)
          options.merge(fallback_protocol: "h2")
        end
      end

      module InstanceMethods
        def initialize(*)
          @signal = Signal.new
          super
        end

        def close(selector = Selector.new)
          @signal.terminate
          selector.deregister(@signal)
          super
        end

        def select_connection(connection, selector)
          return super unless connection.options.stream

          super
          selector.register(@signal)
          connection.signal = @signal
        end

        def deselect_connection(connection, *)
          return super unless connection.options.stream

          super

          connection.signal = nil
        end
      end

      # Adds synchronization to request operations which may buffer payloads from different
      # threads.
      module RequestMethods
        attr_accessor :headers_sent

        def initialize(*)
          super
          @headers_sent = false
          @closed = false
          @flush_buffer_on_body_cb = nil
          @mutex = Thread::Mutex.new
        end

        def flush_buffer_on_body(&cb)
          @flush_buffer_on_body_cb = on(:body, &cb)
        end

        def closed?
          return super unless @options.stream

          @closed
        end

        def can_buffer?
          return super unless @options.stream

          super && @state != :waiting_for_chunk
        end

        # overrides state management transitions to introduce an intermediate
        # +:waiting_for_chunk+ state, which the request transitions to once payload
        # is buffered.
        def transition(nextstate)
          return super unless @options.stream

          headers_sent = @headers_sent

          case nextstate
          when :idle
            headers_sent = false

            if @flush_buffer_on_body_cb
              callbacks(:body).delete(@flush_buffer_on_body_cb)
              @flush_buffer_on_body_cb = nil
            end
          when :waiting_for_chunk
            return unless @state == :body
          when :body
            case @state
            when :headers
              headers_sent = true
            when :waiting_for_chunk
              # HACK: to allow super to pass through
              @state = :headers
            end
          end

          super.tap do
            # delay setting this up until after the first transition to :body
            @headers_sent = headers_sent
          end
        end

        def <<(chunk)
          @mutex.synchronize do
            if @drainer
              @body.clear if @body.respond_to?(:clear)
              @drainer = nil
            end
            @body << chunk

            transition(:body)
          end
        end

        def close
          return super unless @options.stream

          @mutex.synchronize do
            return if @closed

            @closed = true
          end

          # last chunk to send which ends the stream
          self << ""
        end
      end

      module RequestBodyMethods
        def initialize(*, **)
          super

          return unless @options.stream

          @headers.delete("content-length")

          return unless @body

          return if @body.is_a?(Transcoder::Body::Encoder)

          raise Error, "bidirectional streams only allow the usage of the `:body` param to set request bodies." \
                       "You must encode it yourself if you wish to do so."
        end

        def empty?
          return super unless @options.stream

          false
        end
      end

      # overrides the declaration of +@write_buffer+, which is now a thread-safe buffer
      # responding to the same API.
      module ConnectionMethods
        attr_writer :signal

        def initialize(*)
          super

          return unless @options.stream

          @write_buffer = BidiBuffer.new(@options.buffer_size)
        end

        # rebuffers the +@write_buffer+ before calculating interests.
        def interests
          return super unless @options.stream

          @write_buffer.rebuffer

          super
        end

        def call
          return super unless @options.stream && (error = @signal.error)

          on_error(error)
        end

        private

        def set_parser_callbacks(parser)
          return super unless @options.stream

          super
          parser.on(:flush_buffer) do
            @signal.wakeup if @signal
          end
        end
      end
    end
    register_plugin :stream_bidi, StreamBidi
  end
end