1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
# frozen_string_literal: true
module HTTPX
module Plugins
# This plugin makes a session reuse the same selector across all fibers in a given thread.
#
# This enables integration with fiber scheduler implementations such as [async](https://github.com/async).
#
# # https://gitlab.com/os85/httpx/wikis/Fiber-Concurrency
#
module FiberConcurrency
def self.subplugins
{
h2c: FiberConcurrencyH2C,
stream: FiberConcurrencyStream,
}
end
module InstanceMethods
private
def send_request(request, *)
request.set_context!
super
end
def get_current_selector
super(&nil) || begin
return unless block_given?
default = yield
set_current_selector(default)
default
end
end
end
module RequestMethods
# the execution context (fiber) this request was sent on.
attr_reader :context
def initialize(*)
super
@context = nil
end
# sets the execution context for this request. the default is the current fiber.
def set_context!
@context ||= Fiber.current # rubocop:disable Naming/MemoizedInstanceVariableName
end
# checks whether the current execution context is the one where the request was created.
def current_context?
@context == Fiber.current
end
def complete!(response = @response)
@context = nil
super
end
end
module ConnectionMethods
def current_context?
@pending.any?(&:current_context?) || (
@sibling && @sibling.pending.any?(&:current_context?)
)
end
def interests
return if connecting? && @pending.none?(&:current_context?)
super
end
def send(request)
# DoH requests bypass the session, so context needs to be set here.
request.set_context!
super
end
end
module HTTP1Methods
def interests
request = @request || @requests.first
return unless request
return unless request.current_context? || @requests.any?(&:current_context?) || @pending.any?(&:current_context?)
super
end
end
module HTTP2Methods
def initialize(*)
super
@contexts = Hash.new { |hs, k| hs[k] = Set.new }
end
def interests
if @connection.state == :connected && @handshake_completed && !@contexts.key?(Fiber.current)
return :w unless @pings.empty?
return
end
super
end
def send(request, *)
add_to_context(request)
super
end
private
def on_close(_, error, _)
if error == :http_1_1_required
# remove all pending requests context
@pending.each do |req|
clear_from_context(req)
end
end
super
end
def on_stream_close(_, request, error)
clear_from_context(request) if error != :stream_closed && @streams.key?(request)
super
end
def teardown(request = nil)
super
if request
clear_from_context(request)
else
@contexts.clear
end
end
def add_to_context(request)
@contexts[request.context] << request
end
def clear_from_context(request)
requests = @contexts[request.context]
requests.delete(request)
@contexts.delete(request.context) if requests.empty?
end
end
module NativeResolverMethods
private
def calculate_interests
return if @queries.empty?
return unless @queries.values.any?(&:current_context?) || @connections.any?(&:current_context?)
super
end
end
module SystemResolverMethods
def interests
return unless @queries.any? { |_, conn| conn.current_context? }
super
end
end
module FiberConcurrencyH2C
module HTTP2Methods
def upgrade(request, *)
@contexts[request.context] << request
super
end
end
end
module FiberConcurrencyStream
module StreamResponseMethods
def close
unless @request.current_context?
@request.close
return
end
super
end
end
end
end
register_plugin :fiber_concurrency, FiberConcurrency
end
end
|