1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
|
# frozen-string-literal: true
# The base connection pool class, which all other connection pools are based
# on. This class is not instantiated directly, but subclasses should at
# the very least implement the following API:
#
# initialize(Database, Hash) :: Initialize using the passed Sequel::Database
# object and options hash.
# hold(Symbol, &block) :: Yield a connection object (obtained from calling
# the block passed to +initialize+) to the current block. For sharded
# connection pools, the Symbol passed is the shard/server to use.
# disconnect(Symbol) :: Disconnect the connection object. For sharded
# connection pools, the Symbol passed is the shard/server to use.
# servers :: An array of shard/server symbols for all shards/servers that this
# connection pool recognizes.
# size :: an integer representing the total number of connections in the pool,
# or for the given shard/server if sharding is supported.
# max_size :: an integer representing the maximum size of the connection pool,
# or the maximum size per shard/server if sharding is supported.
#
# For sharded connection pools, the sharded API adds the following methods:
#
# add_servers(Array of Symbols) :: start recognizing all shards/servers specified
# by the array of symbols.
# remove_servers(Array of Symbols) :: no longer recognize all shards/servers
# specified by the array of symbols.
class Sequel::ConnectionPool
OPTS = Sequel::OPTS
POOL_CLASS_MAP = {
:threaded => :ThreadedConnectionPool,
:single => :SingleConnectionPool,
:sharded_threaded => :ShardedThreadedConnectionPool,
:sharded_single => :ShardedSingleConnectionPool,
:timed_queue => :TimedQueueConnectionPool,
}
POOL_CLASS_MAP.to_a.each{|k, v| POOL_CLASS_MAP[k.to_s] = v}
POOL_CLASS_MAP.freeze
# Class methods used to return an appropriate pool subclass, separated
# into a module for easier overridding by extensions.
module ClassMethods
# Return a pool subclass instance based on the given options. If a <tt>:pool_class</tt>
# option is provided is provided, use that pool class, otherwise
# use a new instance of an appropriate pool subclass based on the
# <tt>:single_threaded</tt> and <tt>:servers</tt> options.
def get_pool(db, opts = OPTS)
connection_pool_class(opts).new(db, opts)
end
private
# Return a connection pool class based on the given options.
def connection_pool_class(opts)
if pc = opts[:pool_class]
unless pc.is_a?(Class)
unless name = POOL_CLASS_MAP[pc]
raise Sequel::Error, "unsupported connection pool type, please pass appropriate class as the :pool_class option"
end
require_relative "connection_pool/#{pc}"
pc = Sequel.const_get(name)
end
pc
else
pc = if opts[:single_threaded]
opts[:servers] ? :sharded_single : :single
else
opts[:servers] ? :sharded_threaded : :threaded
end
connection_pool_class(:pool_class=>pc)
end
end
end
extend ClassMethods
# The after_connect proc used for this pool. This is called with each new
# connection made, and is usually used to set custom per-connection settings.
# Deprecated.
attr_reader :after_connect # SEQUEL6: Remove
# Override the after_connect proc for the connection pool. Deprecated.
# Disables support for shard-specific :after_connect and :connect_sqls if used.
def after_connect=(v) # SEQUEL6: Remove
@use_old_connect_api = true
@after_connect = v
end
# An array of sql strings to execute on each new connection. Deprecated.
attr_reader :connect_sqls # SEQUEL6: Remove
# Override the connect_sqls for the connection pool. Deprecated.
# Disables support for shard-specific :after_connect and :connect_sqls if used.
def connect_sqls=(v) # SEQUEL6: Remove
@use_old_connect_api = true
@connect_sqls = v
end
# The Sequel::Database object tied to this connection pool.
attr_accessor :db
# Instantiates a connection pool with the given Database and options.
def initialize(db, opts=OPTS) # SEQUEL6: Remove second argument, always use db.opts
@db = db
@use_old_connect_api = false # SEQUEL6: Remove
@after_connect = opts[:after_connect] # SEQUEL6: Remove
@connect_sqls = opts[:connect_sqls] # SEQUEL6: Remove
@error_classes = db.send(:database_error_classes).dup.freeze
end
# An array of symbols for all shards/servers, which is a single <tt>:default</tt> by default.
def servers
[:default]
end
private
# Remove the connection from the pool. For threaded connections, this should be
# called without the mutex, because the disconnection may block.
def disconnect_connection(conn)
db.disconnect_connection(conn)
end
# Whether the given exception is a disconnect exception.
def disconnect_error?(exception)
exception.is_a?(Sequel::DatabaseDisconnectError) || db.send(:disconnect_error?, exception, OPTS)
end
# Return a new connection by calling the connection proc with the given server name,
# and checking for connection errors.
def make_new(server)
begin
if @use_old_connect_api
# SEQUEL6: Remove block
conn = @db.connect(server)
if ac = @after_connect
if ac.arity == 2
ac.call(conn, server)
else
ac.call(conn)
end
end
if cs = @connect_sqls
cs.each do |sql|
db.send(:log_connection_execute, conn, sql)
end
end
conn
else
@db.new_connection(server)
end
rescue Exception=>exception
raise Sequel.convert_exception_class(exception, Sequel::DatabaseConnectionError)
end || raise(Sequel::DatabaseConnectionError, "Connection parameters not valid")
end
end
|