1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285
|
# frozen-string-literal: true
# A connection pool allowing multi-threaded access to a pool of connections.
# This is the default connection pool used by Sequel.
class Sequel::ThreadedConnectionPool < Sequel::ConnectionPool
USE_WAITER = true # SEQUEL6: Remove
Sequel::Deprecation.deprecate_constant(self, :USE_WAITER)
# The maximum number of connections this pool will create (per shard/server
# if sharding).
attr_reader :max_size
# An array of connections that are available for use by the pool.
# The calling code should already have the mutex before calling this.
attr_reader :available_connections # SEQUEL6: Remove
# A hash with thread/fiber keys and connection values for currently allocated connections.
# The calling code should already have the mutex before calling this.
attr_reader :allocated # SEQUEL6: Remove
# The following additional options are respected:
# :max_connections :: The maximum number of connections the connection pool
# will open (default 4)
# :pool_timeout :: The amount of seconds to wait to acquire a connection
# before raising a PoolTimeout error (default 5)
def initialize(db, opts = OPTS)
super
@max_size = Integer(opts[:max_connections] || 4)
raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
@mutex = Mutex.new
@connection_handling = opts[:connection_handling]
@available_connections = []
@allocated = {}
@allocated.compare_by_identity
@timeout = Float(opts[:pool_timeout] || 5)
@waiter = ConditionVariable.new
end
# Yield all of the available connections, and the one currently allocated to
# this thread. This will not yield connections currently allocated to other
# threads, as it is not safe to operate on them. This holds the mutex while
# it is yielding all of the available connections, which means that until
# the method's block returns, the pool is locked.
def all_connections
hold do |c|
sync do
yield c
@available_connections.each{|conn| yield conn}
end
end
end
# Removes all connections currently available. This method has the effect of
# disconnecting from the database, assuming that no connections are currently
# being used. If you want to be able to disconnect connections that are
# currently in use, use the ShardedThreadedConnectionPool, which can do that.
# This connection pool does not, for performance reasons. To use the sharded pool,
# pass the <tt>servers: {}</tt> option when connecting to the database.
#
# Once a connection is requested using #hold, the connection pool
# creates new connections to the database.
def disconnect(opts=OPTS)
conns = nil
sync do
conns = @available_connections.dup
@available_connections.clear
@waiter.signal
end
conns.each{|conn| disconnect_connection(conn)}
end
# Chooses the first available connection, or if none are
# available, creates a new connection. Passes the connection to the supplied
# block:
#
# pool.hold {|conn| conn.execute('DROP TABLE posts')}
#
# Pool#hold is re-entrant, meaning it can be called recursively in
# the same thread without blocking.
#
# If no connection is immediately available and the pool is already using the maximum
# number of connections, Pool#hold will block until a connection
# is available or the timeout expires. If the timeout expires before a
# connection can be acquired, a Sequel::PoolTimeout is raised.
def hold(server=nil)
t = Sequel.current
if conn = owned_connection(t)
return yield(conn)
end
begin
conn = acquire(t)
yield conn
rescue Sequel::DatabaseDisconnectError, *@error_classes => e
if disconnect_error?(e)
oconn = conn
conn = nil
disconnect_connection(oconn) if oconn
sync do
@allocated.delete(t)
@waiter.signal
end
end
raise
ensure
if conn
sync{release(t)}
if @connection_handling == :disconnect
disconnect_connection(conn)
end
end
end
end
def pool_type
:threaded
end
# The total number of connections opened, either available or allocated.
# The calling code should not have the mutex before calling this.
def size
@mutex.synchronize{_size}
end
private
# The total number of connections opened, either available or allocated.
# The calling code should already have the mutex before calling this.
def _size
@allocated.length + @available_connections.length
end
# Assigns a connection to the supplied thread, if one
# is available. The calling code should NOT already have the mutex when
# calling this.
#
# This should return a connection is one is available within the timeout,
# or raise PoolTimeout if a connection could not be acquired within the timeout.
def acquire(thread)
if conn = assign_connection(thread)
return conn
end
timeout = @timeout
timer = Sequel.start_timer
sync do
@waiter.wait(@mutex, timeout)
if conn = next_available
return(@allocated[thread] = conn)
end
end
until conn = assign_connection(thread)
elapsed = Sequel.elapsed_seconds_since(timer)
# :nocov:
raise_pool_timeout(elapsed) if elapsed > timeout
# It's difficult to get to this point, it can only happen if there is a race condition
# where a connection cannot be acquired even after the thread is signalled by the condition variable
sync do
@waiter.wait(@mutex, timeout - elapsed)
if conn = next_available
return(@allocated[thread] = conn)
end
end
# :nocov:
end
conn
end
# Assign a connection to the thread, or return nil if one cannot be assigned.
# The caller should NOT have the mutex before calling this.
def assign_connection(thread)
# Thread safe as instance variable is only assigned to local variable
# and not operated on outside mutex.
allocated = @allocated
do_make_new = false
to_disconnect = nil
sync do
if conn = next_available
return(allocated[thread] = conn)
end
if (n = _size) >= (max = @max_size)
allocated.keys.each do |t|
unless t.alive?
(to_disconnect ||= []) << allocated.delete(t)
end
end
n = nil
end
if (n || _size) < max
do_make_new = allocated[thread] = true
end
end
if to_disconnect
to_disconnect.each{|dconn| disconnect_connection(dconn)}
end
# Connect to the database outside of the connection pool mutex,
# as that can take a long time and the connection pool mutex
# shouldn't be locked while the connection takes place.
if do_make_new
begin
conn = make_new(:default)
sync{allocated[thread] = conn}
ensure
unless conn
sync{allocated.delete(thread)}
end
end
end
conn
end
# Return a connection to the pool of available connections, returns the connection.
# The calling code should already have the mutex before calling this.
def checkin_connection(conn)
@available_connections << conn
conn
end
# Return the next available connection in the pool, or nil if there
# is not currently an available connection. The calling code should already
# have the mutex before calling this.
def next_available
case @connection_handling
when :stack
@available_connections.pop
else
@available_connections.shift
end
end
# Returns the connection owned by the supplied thread,
# if any. The calling code should NOT already have the mutex before calling this.
def owned_connection(thread)
sync{@allocated[thread]}
end
# Create the maximum number of connections immediately. The calling code should
# NOT have the mutex before calling this.
def preconnect(concurrent = false)
enum = (max_size - _size).times
conns = if concurrent
enum.map{Thread.new{make_new(:default)}}.map(&:value)
else
enum.map{make_new(:default)}
end
sync{conns.each{|conn| checkin_connection(conn)}}
end
# Raise a PoolTimeout error showing the current timeout, the elapsed time, and the
# database's name (if any).
def raise_pool_timeout(elapsed)
name = db.opts[:name]
raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
end
# Releases the connection assigned to the supplied thread back to the pool.
# The calling code should already have the mutex before calling this.
def release(thread)
conn = @allocated.delete(thread)
unless @connection_handling == :disconnect
checkin_connection(conn)
end
@waiter.signal
nil
end
# Yield to the block while inside the mutex. The calling code should NOT
# already have the mutex before calling this.
def sync
@mutex.synchronize{yield}
end
end
|