1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
require "timeout"
require_relative "connection_pool/version"
class ConnectionPool
class Error < ::RuntimeError; end
class PoolShuttingDownError < ::ConnectionPool::Error; end
class TimeoutError < ::Timeout::Error; end
end
# Generic connection pool class for sharing a limited number of objects or network connections
# among many threads. Note: pool elements are lazily created.
#
# Example usage with block (faster):
#
# @pool = ConnectionPool.new { Redis.new }
# @pool.with do |redis|
# redis.lpop('my-list') if redis.llen('my-list') > 0
# end
#
# Using optional timeout override (for that single invocation)
#
# @pool.with(timeout: 2.0) do |redis|
# redis.lpop('my-list') if redis.llen('my-list') > 0
# end
#
# Example usage replacing an existing connection (slower):
#
# $redis = ConnectionPool.wrap { Redis.new }
#
# def do_work
# $redis.lpop('my-list') if $redis.llen('my-list') > 0
# end
#
# Accepts the following options:
# - :size - number of connections to pool, defaults to 5
# - :timeout - amount of time to wait for a connection if none currently available, defaults to 5 seconds
# - :auto_reload_after_fork - automatically drop all connections after fork, defaults to true
#
class ConnectionPool
def self.wrap(**, &)
Wrapper.new(**, &)
end
attr_reader :size
def initialize(timeout: 5, size: 5, auto_reload_after_fork: true, name: nil, &)
raise ArgumentError, "Connection pool requires a block" unless block_given?
@size = Integer(size)
@timeout = Float(timeout)
@available = TimedStack.new(size: @size, &)
@key = :"pool-#{@available.object_id}"
@key_count = :"pool-#{@available.object_id}-count"
@discard_key = :"pool-#{@available.object_id}-discard"
INSTANCES[self] = self if auto_reload_after_fork && INSTANCES
end
def with(**)
# We need to manage exception handling manually here in order
# to work correctly with `Timeout.timeout` and `Thread#raise`.
# Otherwise an interrupted Thread can leak connections.
Thread.handle_interrupt(Exception => :never) do
conn = checkout(**)
begin
Thread.handle_interrupt(Exception => :immediate) do
yield conn
end
ensure
checkin
end
end
end
alias_method :then, :with
##
# Marks the current thread's checked-out connection for discard.
#
# When a connection is marked for discard, it will not be returned to the pool
# when checked in. Instead, the connection will be discarded.
# This is useful when a connection has become invalid or corrupted
# and should not be reused.
#
# Takes an optional block that will be called with the connection to be discarded.
# The block should perform any necessary clean-up on the connection.
#
# @yield [conn]
# @yieldparam conn [Object] The connection to be discarded.
# @yieldreturn [void]
#
#
# Note: This only affects the connection currently checked out by the calling thread.
# The connection will be discarded when +checkin+ is called.
#
# @return [void]
#
# @example
# pool.with do |conn|
# begin
# conn.execute("SELECT 1")
# rescue SomeConnectionError
# pool.discard_current_connection # Mark connection as bad
# raise
# end
# end
def discard_current_connection(&block)
::Thread.current[@discard_key] = block || proc { |conn| conn }
end
def checkout(timeout: @timeout, **)
if ::Thread.current[@key]
::Thread.current[@key_count] += 1
::Thread.current[@key]
else
conn = @available.pop(timeout:, **)
::Thread.current[@key] = conn
::Thread.current[@key_count] = 1
conn
end
end
def checkin(force: false)
if ::Thread.current[@key]
if ::Thread.current[@key_count] == 1 || force
if ::Thread.current[@discard_key]
begin
@available.decrement_created
::Thread.current[@discard_key].call(::Thread.current[@key])
rescue
nil
ensure
::Thread.current[@discard_key] = nil
end
else
@available.push(::Thread.current[@key])
end
::Thread.current[@key] = nil
::Thread.current[@key_count] = nil
else
::Thread.current[@key_count] -= 1
end
elsif !force
raise ConnectionPool::Error, "no connections are checked out"
end
nil
end
##
# Shuts down the ConnectionPool by passing each connection to +block+ and
# then removing it from the pool. Attempting to checkout a connection after
# shutdown will raise +ConnectionPool::PoolShuttingDownError+.
def shutdown(&)
@available.shutdown(&)
end
##
# Reloads the ConnectionPool by passing each connection to +block+ and then
# removing it the pool. Subsequent checkouts will create new connections as
# needed.
def reload(&)
@available.shutdown(reload: true, &)
end
## Reaps idle connections that have been idle for over +idle_seconds+.
# +idle_seconds+ defaults to 60.
def reap(idle_seconds: 60, &)
@available.reap(idle_seconds:, &)
end
# Number of pool entries available for checkout at this instant.
def available
@available.length
end
# Number of pool entries created and idle in the pool.
def idle
@available.idle
end
end
require_relative "connection_pool/timed_stack"
require_relative "connection_pool/wrapper"
require_relative "connection_pool/fork"
|