File: connection_pool.rb

package info (click to toggle)
ruby-sequel 5.41.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 9,548 kB
  • sloc: ruby: 104,241; makefile: 3
file content (143 lines) | stat: -rw-r--r-- 5,632 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# frozen-string-literal: true

# The base connection pool class, which all other connection pools are based
# on.  This class is not instantiated directly, but subclasses should at
# the very least implement the following API:
#
# initialize(Database, Hash) :: Initialize using the passed Sequel::Database
#                               object and options hash.
# hold(Symbol, &block) :: Yield a connection object (obtained from calling
#                         the block passed to +initialize+) to the current block. For sharded
#                         connection pools, the Symbol passed is the shard/server to use.
# disconnect(Symbol) :: Disconnect the connection object.  For sharded
#                       connection pools, the Symbol passed is the shard/server to use.
# servers :: An array of shard/server symbols for all shards/servers that this
#            connection pool recognizes.
# size :: an integer representing the total number of connections in the pool,
#         or for the given shard/server if sharding is supported.
# max_size :: an integer representing the maximum size of the connection pool,
#             or the maximum size per shard/server if sharding is supported.
#
# For sharded connection pools, the sharded API adds the following methods:
#
# add_servers(Array of Symbols) :: start recognizing all shards/servers specified
#                                  by the array of symbols.
# remove_servers(Array of Symbols) :: no longer recognize all shards/servers
#                                     specified by the array of symbols.
class Sequel::ConnectionPool
  OPTS = Sequel::OPTS
  POOL_CLASS_MAP = {
    :threaded => :ThreadedConnectionPool,
    :single => :SingleConnectionPool,
    :sharded_threaded => :ShardedThreadedConnectionPool,
    :sharded_single => :ShardedSingleConnectionPool
  }.freeze

  # Class methods used to return an appropriate pool subclass, separated
  # into a module for easier overridding by extensions.
  module ClassMethods
    # Return a pool subclass instance based on the given options.  If a <tt>:pool_class</tt>
    # option is provided is provided, use that pool class, otherwise
    # use a new instance of an appropriate pool subclass based on the
    # <tt>:single_threaded</tt> and <tt>:servers</tt> options.
    def get_pool(db, opts = OPTS)
      connection_pool_class(opts).new(db, opts)
    end
    
    private
    
    # Return a connection pool class based on the given options.
    def connection_pool_class(opts)
      if pc = opts[:pool_class]
        unless pc.is_a?(Class)
          unless name = POOL_CLASS_MAP[pc]
            raise Sequel::Error, "unsupported connection pool type, please pass appropriate class as the :pool_class option"
          end

          require_relative "connection_pool/#{pc}"
          pc = Sequel.const_get(name)
        end

        pc
      else
        pc = if opts[:single_threaded]
          opts[:servers] ? :sharded_single : :single
        else
          opts[:servers] ? :sharded_threaded : :threaded
        end

        connection_pool_class(:pool_class=>pc)
      end
    end
  end
  extend ClassMethods

  # The after_connect proc used for this pool.  This is called with each new
  # connection made, and is usually used to set custom per-connection settings.
  attr_accessor :after_connect

  # An array of sql strings to execute on each new connection.
  attr_accessor :connect_sqls

  # The Sequel::Database object tied to this connection pool.
  attr_accessor :db

  # Instantiates a connection pool with the given options.  The block is called
  # with a single symbol (specifying the server/shard to use) every time a new
  # connection is needed.  The following options are respected for all connection
  # pools:
  # :after_connect :: A callable object called after each new connection is made, with the
  #                   connection object (and server argument if the callable accepts 2 arguments),
  #                   useful for customizations that you want to apply to all connections.
  # :connect_sqls :: An array of sql strings to execute on each new connection, after :after_connect runs.
  def initialize(db, opts=OPTS)
    @db = db
    @after_connect = opts[:after_connect]
    @connect_sqls = opts[:connect_sqls]
    @error_classes = db.send(:database_error_classes).dup.freeze
  end
  
  # An array of symbols for all shards/servers, which is a single <tt>:default</tt> by default.
  def servers
    [:default]
  end
  
  private

  # Remove the connection from the pool.  For threaded connections, this should be
  # called without the mutex, because the disconnection may block.
  def disconnect_connection(conn)
    db.disconnect_connection(conn)
  end

  # Whether the given exception is a disconnect exception.
  def disconnect_error?(exception)
    exception.is_a?(Sequel::DatabaseDisconnectError) || db.send(:disconnect_error?, exception, OPTS)
  end
  
  # Return a new connection by calling the connection proc with the given server name,
  # and checking for connection errors.
  def make_new(server)
    begin
      conn = @db.connect(server)

      if ac = @after_connect
        if ac.arity == 2
          ac.call(conn, server)
        else
          ac.call(conn)
        end
      end

      if cs = @connect_sqls
        cs.each do |sql|
          db.send(:log_connection_execute, conn, sql)
        end
      end
    rescue Exception=>exception
      raise Sequel.convert_exception_class(exception, Sequel::DatabaseConnectionError)
    end
    raise(Sequel::DatabaseConnectionError, "Connection parameters not valid") unless conn
    conn
  end
end