1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
require 'thread'
# A ConnectionPool manages access to database connections by keeping
# multiple connections and giving threads exclusive access to each
# connection.
class ConnectionPool
attr_reader :mutex
# The maximum number of connections.
attr_reader :max_size
# The proc used to create a new connection.
attr_accessor :connection_proc
attr_reader :available_connections, :allocated, :created_count
# Constructs a new pool with a maximum size. If a block is supplied, it
# is used to create new connections as they are needed.
#
# pool = ConnectionPool.new(10) {MyConnection.new(opts)}
#
# The connection creation proc can be changed at any time by assigning a
# Proc to pool#connection_proc.
#
# pool = ConnectionPool.new(10)
# pool.connection_proc = proc {MyConnection.new(opts)}
def initialize(max_size = 4, &block)
@max_size = max_size
@mutex = Mutex.new
@connection_proc = block
@available_connections = []
@allocated = {}
@created_count = 0
end
# Returns the number of created connections.
def size
@created_count
end
# Assigns a connection to the current thread, yielding the connection
# to the supplied block.
#
# pool.hold {|conn| conn.execute('DROP TABLE posts')}
#
# Pool#hold is re-entrant, meaning it can be called recursively in
# the same thread without blocking.
#
# If no connection is available, Pool#hold will block until a connection
# is available.
def hold
t = Thread.current
if (conn = owned_connection(t))
return yield(conn)
end
while !(conn = acquire(t))
sleep 0.001
end
begin
yield conn
ensure
release(t)
end
rescue Exception => e
# if the error is not a StandardError it is converted into RuntimeError.
raise e.is_a?(StandardError) ? e : e.message
end
# Removes all connection currently available, optionally yielding each
# connection to the given block. This method has the effect of
# disconnecting from the database. Once a connection is requested using
# #hold, the connection pool creates new connections to the database.
def disconnect(&block)
@mutex.synchronize do
@available_connections.each {|c| block[c]} if block
@available_connections = []
@created_count = @allocated.size
end
end
private
# Returns the connection owned by the supplied thread, if any.
def owned_connection(thread)
@mutex.synchronize {@allocated[thread]}
end
# Assigns a connection to the supplied thread, if one is available.
def acquire(thread)
@mutex.synchronize do
if conn = available
@allocated[thread] = conn
end
end
end
# Returns an available connection. If no connection is available,
# tries to create a new connection.
def available
@available_connections.pop || make_new
end
# Creates a new connection if the size of the pool is less than the
# maximum size.
def make_new
if @created_count < @max_size
@created_count += 1
@connection_proc ? @connection_proc.call : \
(raise Error, "No connection proc specified")
end
end
# Releases the connection assigned to the supplied thread.
def release(thread)
@mutex.synchronize do
@available_connections << @allocated[thread]
@allocated.delete(thread)
end
end
end
# A SingleThreadedPool acts as a replacement for a ConnectionPool for use
# in single-threaded applications. ConnectionPool imposes a substantial
# performance penalty, so SingleThreadedPool is used to gain some speed.
class SingleThreadedPool
attr_reader :conn
attr_writer :connection_proc
# Initializes the instance with the supplied block as the connection_proc.
def initialize(&block)
@connection_proc = block
end
# Yields the connection to the supplied block. This method simulates the
# ConnectionPool#hold API.
def hold
@conn ||= @connection_proc.call
yield @conn
rescue Exception => e
# if the error is not a StandardError it is converted into RuntimeError.
raise e.is_a?(StandardError) ? e : e.message
end
# Disconnects from the database. Once a connection is requested using
# #hold, the connection is reestablished.
def disconnect(&block)
block[@conn] if block && @conn
@conn = nil
end
end
|