File: fiber_concurrency.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (210 lines) | stat: -rw-r--r-- 4,758 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# frozen_string_literal: true

module HTTPX
  module Plugins
    # This plugin makes a session reuse the same selector across all fibers in a given thread.
    #
    # This enables integration with fiber scheduler implementations such as [async](https://github.com/async).
    #
    # # https://gitlab.com/os85/httpx/wikis/Fiber-Concurrency
    #
    module FiberConcurrency
      def self.subplugins
        {
          h2c: FiberConcurrencyH2C,
          stream: FiberConcurrencyStream,
        }
      end

      module InstanceMethods
        private

        def send_request(request, *)
          request.set_context!

          super
        end

        def get_current_selector
          super(&nil) || begin
            return unless block_given?

            default = yield

            set_current_selector(default)

            default
          end
        end
      end

      module RequestMethods
        # the execution context (fiber) this request was sent on.
        attr_reader :context

        def initialize(*)
          super
          @context = nil
        end

        # sets the execution context for this request. the default is the current fiber.
        def set_context!
          @context ||= Fiber.current # rubocop:disable Naming/MemoizedInstanceVariableName
        end

        # checks whether the current execution context is the one where the request was created.
        def current_context?
          @context == Fiber.current
        end

        def complete!(response = @response)
          @context = nil
          super
        end
      end

      module ConnectionMethods
        def current_context?
          @pending.any?(&:current_context?) || (
            @sibling && @sibling.pending.any?(&:current_context?)
          )
        end

        def interests
          return if connecting? && @pending.none?(&:current_context?)

          super
        end

        def send(request)
          # DoH requests bypass the session, so context needs to be set here.
          request.set_context!

          super
        end
      end

      module HTTP1Methods
        def interests
          request = @request || @requests.first

          return unless request

          return unless request.current_context? || @requests.any?(&:current_context?) || @pending.any?(&:current_context?)

          super
        end
      end

      module HTTP2Methods
        def initialize(*)
          super
          @contexts = Hash.new { |hs, k| hs[k] = Set.new }
        end

        def interests
          if @connection.state == :connected && @handshake_completed && !@contexts.key?(Fiber.current)
            return :w unless @pings.empty?

            return
          end

          super
        end

        def send(request, *)
          add_to_context(request)

          super
        end

        private

        def on_close(_, error, _)
          if error == :http_1_1_required
            # remove all pending requests context
            @pending.each do |req|
              clear_from_context(req)
            end
          end

          super
        end

        def on_stream_close(_, request, error)
          clear_from_context(request) if error != :stream_closed && @streams.key?(request)

          super
        end

        def teardown(request = nil)
          super

          if request
            clear_from_context(request)
          else
            @contexts.clear
          end
        end

        def add_to_context(request)
          @contexts[request.context] << request
        end

        def clear_from_context(request)
          requests = @contexts[request.context]

          requests.delete(request)

          @contexts.delete(request.context) if requests.empty?
        end
      end

      module NativeResolverMethods
        private

        def calculate_interests
          return if @queries.empty?

          return unless @queries.values.any?(&:current_context?) || @connections.any?(&:current_context?)

          super
        end
      end

      module SystemResolverMethods
        def interests
          return unless @queries.any? { |_, conn| conn.current_context? }

          super
        end
      end

      module FiberConcurrencyH2C
        module HTTP2Methods
          def upgrade(request, *)
            @contexts[request.context] << request

            super
          end
        end
      end

      module FiberConcurrencyStream
        module StreamResponseMethods
          def close
            unless @request.current_context?
              @request.close

              return
            end

            super
          end
        end
      end
    end

    register_plugin :fiber_concurrency, FiberConcurrency
  end
end