1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
# frozen_string_literal: true
require "httpx/selector"
require "httpx/connection"
require "httpx/connection/http2"
require "httpx/connection/http1"
require "httpx/resolver"
module HTTPX
class Pool
using URIExtensions
POOL_TIMEOUT = 5
# Sets up the connection pool with the given +options+, which can be the following:
#
# :max_connections:: the maximum number of connections held in the pool.
# :max_connections_per_origin :: the maximum number of connections held in the pool pointing to a given origin.
# :pool_timeout :: the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)
#
def initialize(options)
@max_connections = options.fetch(:max_connections, Float::INFINITY)
@max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
@pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
@resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
@resolver_mtx = Thread::Mutex.new
@connections = []
@connection_mtx = Thread::Mutex.new
@connections_counter = 0
@max_connections_cond = ConditionVariable.new
@origin_counters = Hash.new(0)
@origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
end
# connections returned by this function are not expected to return to the connection pool.
def pop_connection
@connection_mtx.synchronize do
drop_connection
end
end
# opens a connection to the IP reachable through +uri+.
# Many hostnames are reachable through the same IP, so we try to
# maximize pipelining by opening as few connections as possible.
#
def checkout_connection(uri, options)
return checkout_new_connection(uri, options) if options.io
@connection_mtx.synchronize do
acquire_connection(uri, options) || begin
if @connections_counter == @max_connections
# this takes precedence over per-origin
expires_at = Utils.now + @pool_timeout
loop do
@max_connections_cond.wait(@connection_mtx, @pool_timeout)
if (conn = acquire_connection(uri, options))
return conn
end
# if one can afford to create a new connection, do it
break unless @connections_counter == @max_connections
# if no matching usable connection was found, the pool will make room and drop a closed connection.
if (conn = @connections.find { |c| c.state == :closed })
drop_connection(conn)
break
end
# happens when a condition was signalled, but another thread snatched the available connection before
# context was passed back here.
next if Utils.now < expires_at
raise PoolTimeoutError.new(@pool_timeout,
"Timed out after #{@pool_timeout} seconds while waiting for a connection")
end
end
if @origin_counters[uri.origin] == @max_connections_per_origin
expires_at = Utils.now + @pool_timeout
loop do
@origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)
if (conn = acquire_connection(uri, options))
return conn
end
# happens when a condition was signalled, but another thread snatched the available connection before
# context was passed back here.
next if Utils.now < expires_at
raise(PoolTimeoutError.new(@pool_timeout,
"Timed out after #{@pool_timeout} seconds while waiting for a connection to #{uri.origin}"))
end
end
@connections_counter += 1
@origin_counters[uri.origin] += 1
checkout_new_connection(uri, options)
end
end
end
def checkin_connection(connection)
return if connection.options.io
@connection_mtx.synchronize do
if connection.coalesced? || connection.state == :idle
# when connections coalesce
drop_connection(connection)
return
end
@connections << connection
@max_connections_cond.signal
@origin_conds[connection.origin.to_s].signal
# Observed situations where a session handling multiple requests in a loop
# across multiple threads checks the same connection in and out, while another
# thread which is waiting on the same connection never gets the chance to pick
# it up, because ruby's thread scheduler never switched on to it in the process.
Thread.pass
end
end
def checkout_mergeable_connection(connection)
return if connection.options.io
@connection_mtx.synchronize do
idx = @connections.find_index do |ch|
ch != connection && ch.mergeable?(connection)
end
@connections.delete_at(idx) if idx
end
end
def reset_resolvers
@resolver_mtx.synchronize { @resolvers.clear }
end
def checkout_resolver(options)
resolver_type = options.resolver_class
@resolver_mtx.synchronize do
resolvers = @resolvers[resolver_type]
idx = resolvers.find_index do |res|
res.options == options
end
resolvers.delete_at(idx) if idx
end || checkout_new_resolver(resolver_type, options)
end
def checkin_resolver(resolver)
resolver_class = resolver.class
resolver = resolver.multi
# a multi requires all sub-resolvers being closed in order to be
# correctly checked back in.
return unless resolver.closed?
@resolver_mtx.synchronize do
resolvers = @resolvers[resolver_class]
resolvers << resolver unless resolvers.include?(resolver)
end
end
# :nocov:
def inspect
"#<#{self.class}:#{object_id} " \
"@max_connections=#{@max_connections} " \
"@max_connections_per_origin=#{@max_connections_per_origin} " \
"@pool_timeout=#{@pool_timeout} " \
"@connections=#{@connections.size}>"
end
# :nocov:
private
def acquire_connection(uri, options)
idx = @connections.find_index do |connection|
connection.match?(uri, options)
end
return unless idx
@connections.delete_at(idx)
end
def checkout_new_connection(uri, options)
connection = options.connection_class.new(uri, options)
connection.log(level: 2) { "created connection##{connection.object_id} in pool##{object_id}" }
connection
end
def checkout_new_resolver(resolver_type, options)
resolver = if resolver_type.multi?
Resolver::Multi.new(resolver_type, options)
else
resolver_type.new(options)
end
resolver.log(level: 2) { "created resolver##{resolver.object_id} in pool##{object_id}" }
resolver
end
# drops and returns the +connection+ from the connection pool; if +connection+ is <tt>nil</tt> (default),
# the first available connection from the pool will be dropped.
def drop_connection(connection = nil)
if connection
@connections.delete(connection)
else
connection = @connections.shift
return unless connection
end
@connections_counter -= 1
@origin_conds.delete(connection.origin) if (@origin_counters[connection.origin.to_s] -= 1).zero?
connection
end
end
end
|