File: connection_pool.rb

package info (click to toggle)
ruby-sequel 5.63.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,408 kB
  • sloc: ruby: 113,747; makefile: 3
file content (160 lines) | stat: -rw-r--r-- 6,040 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# frozen-string-literal: true

# The base connection pool class, which all other connection pools are based
# on.  This class is not instantiated directly, but subclasses should at
# the very least implement the following API:
#
# initialize(Database, Hash) :: Initialize using the passed Sequel::Database
#                               object and options hash.
# hold(Symbol, &block) :: Yield a connection object (obtained from calling
#                         the block passed to +initialize+) to the current block. For sharded
#                         connection pools, the Symbol passed is the shard/server to use.
# disconnect(Symbol) :: Disconnect the connection object.  For sharded
#                       connection pools, the Symbol passed is the shard/server to use.
# servers :: An array of shard/server symbols for all shards/servers that this
#            connection pool recognizes.
# size :: an integer representing the total number of connections in the pool,
#         or for the given shard/server if sharding is supported.
# max_size :: an integer representing the maximum size of the connection pool,
#             or the maximum size per shard/server if sharding is supported.
#
# For sharded connection pools, the sharded API adds the following methods:
#
# add_servers(Array of Symbols) :: start recognizing all shards/servers specified
#                                  by the array of symbols.
# remove_servers(Array of Symbols) :: no longer recognize all shards/servers
#                                     specified by the array of symbols.
class Sequel::ConnectionPool
  OPTS = Sequel::OPTS
  POOL_CLASS_MAP = {
    :threaded => :ThreadedConnectionPool,
    :single => :SingleConnectionPool,
    :sharded_threaded => :ShardedThreadedConnectionPool,
    :sharded_single => :ShardedSingleConnectionPool,
    :timed_queue => :TimedQueueConnectionPool,
  }
  POOL_CLASS_MAP.to_a.each{|k, v| POOL_CLASS_MAP[k.to_s] = v}
  POOL_CLASS_MAP.freeze

  # Class methods used to return an appropriate pool subclass, separated
  # into a module for easier overridding by extensions.
  module ClassMethods
    # Return a pool subclass instance based on the given options.  If a <tt>:pool_class</tt>
    # option is provided is provided, use that pool class, otherwise
    # use a new instance of an appropriate pool subclass based on the
    # <tt>:single_threaded</tt> and <tt>:servers</tt> options.
    def get_pool(db, opts = OPTS)
      connection_pool_class(opts).new(db, opts)
    end
    
    private
    
    # Return a connection pool class based on the given options.
    def connection_pool_class(opts)
      if pc = opts[:pool_class]
        unless pc.is_a?(Class)
          unless name = POOL_CLASS_MAP[pc]
            raise Sequel::Error, "unsupported connection pool type, please pass appropriate class as the :pool_class option"
          end

          require_relative "connection_pool/#{pc}"
          pc = Sequel.const_get(name)
        end

        pc
      else
        pc = if opts[:single_threaded]
          opts[:servers] ? :sharded_single : :single
        else
          opts[:servers] ? :sharded_threaded : :threaded
        end

        connection_pool_class(:pool_class=>pc)
      end
    end
  end
  extend ClassMethods

  # The after_connect proc used for this pool.  This is called with each new
  # connection made, and is usually used to set custom per-connection settings.
  # Deprecated.
  attr_reader :after_connect # SEQUEL6: Remove

  # Override the after_connect proc for the connection pool. Deprecated.
  # Disables support for shard-specific :after_connect and :connect_sqls if used.
  def after_connect=(v) # SEQUEL6: Remove
    @use_old_connect_api = true
    @after_connect = v
  end

  # An array of sql strings to execute on each new connection. Deprecated.
  attr_reader :connect_sqls # SEQUEL6: Remove

  # Override the connect_sqls for the connection pool. Deprecated.
  # Disables support for shard-specific :after_connect and :connect_sqls if used.
  def connect_sqls=(v) # SEQUEL6: Remove
    @use_old_connect_api = true
    @connect_sqls = v
  end

  # The Sequel::Database object tied to this connection pool.
  attr_accessor :db

  # Instantiates a connection pool with the given Database and options.
  def initialize(db, opts=OPTS) # SEQUEL6: Remove second argument, always use db.opts
    @db = db
    @use_old_connect_api = false # SEQUEL6: Remove
    @after_connect = opts[:after_connect] # SEQUEL6: Remove
    @connect_sqls = opts[:connect_sqls] # SEQUEL6: Remove
    @error_classes = db.send(:database_error_classes).dup.freeze
  end
  
  # An array of symbols for all shards/servers, which is a single <tt>:default</tt> by default.
  def servers
    [:default]
  end
  
  private

  # Remove the connection from the pool.  For threaded connections, this should be
  # called without the mutex, because the disconnection may block.
  def disconnect_connection(conn)
    db.disconnect_connection(conn)
  end

  # Whether the given exception is a disconnect exception.
  def disconnect_error?(exception)
    exception.is_a?(Sequel::DatabaseDisconnectError) || db.send(:disconnect_error?, exception, OPTS)
  end
  
  # Return a new connection by calling the connection proc with the given server name,
  # and checking for connection errors.
  def make_new(server)
    begin
      if @use_old_connect_api
        # SEQUEL6: Remove block
        conn = @db.connect(server)

        if ac = @after_connect
          if ac.arity == 2
            ac.call(conn, server)
          else
            ac.call(conn)
          end
        end
  
        if cs = @connect_sqls
          cs.each do |sql|
            db.send(:log_connection_execute, conn, sql)
          end
        end

        conn
      else
        @db.new_connection(server)
      end
    rescue Exception=>exception
      raise Sequel.convert_exception_class(exception, Sequel::DatabaseConnectionError)
    end || raise(Sequel::DatabaseConnectionError, "Connection parameters not valid")
  end
end