File: threaded.rb

package info (click to toggle)
ruby-sequel 5.63.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,408 kB
  • sloc: ruby: 113,747; makefile: 3
file content (285 lines) | stat: -rw-r--r-- 9,089 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# frozen-string-literal: true

# A connection pool allowing multi-threaded access to a pool of connections.
# This is the default connection pool used by Sequel.
class Sequel::ThreadedConnectionPool < Sequel::ConnectionPool
  USE_WAITER = true # SEQUEL6: Remove
  Sequel::Deprecation.deprecate_constant(self, :USE_WAITER)

  # The maximum number of connections this pool will create (per shard/server
  # if sharding).
  attr_reader :max_size
  
  # An array of connections that are available for use by the pool.
  # The calling code should already have the mutex before calling this.
  attr_reader :available_connections # SEQUEL6: Remove
  
  # A hash with thread/fiber keys and connection values for currently allocated connections.
  # The calling code should already have the mutex before calling this.
  attr_reader :allocated # SEQUEL6: Remove

  # The following additional options are respected:
  # :max_connections :: The maximum number of connections the connection pool
  #                     will open (default 4)
  # :pool_timeout :: The amount of seconds to wait to acquire a connection
  #                  before raising a PoolTimeout error (default 5)
  def initialize(db, opts = OPTS)
    super
    @max_size = Integer(opts[:max_connections] || 4)
    raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
    @mutex = Mutex.new  
    @connection_handling = opts[:connection_handling]
    @available_connections = []
    @allocated = {}
    @allocated.compare_by_identity
    @timeout = Float(opts[:pool_timeout] || 5)
    @waiter = ConditionVariable.new
  end
  
  # Yield all of the available connections, and the one currently allocated to
  # this thread.  This will not yield connections currently allocated to other
  # threads, as it is not safe to operate on them.  This holds the mutex while
  # it is yielding all of the available connections, which means that until
  # the method's block returns, the pool is locked.
  def all_connections
    hold do |c|
      sync do
        yield c
        @available_connections.each{|conn| yield conn}
      end
    end
  end
  
  # Removes all connections currently available. This method has the effect of 
  # disconnecting from the database, assuming that no connections are currently
  # being used.  If you want to be able to disconnect connections that are
  # currently in use, use the ShardedThreadedConnectionPool, which can do that.
  # This connection pool does not, for performance reasons. To use the sharded pool,
  # pass the <tt>servers: {}</tt> option when connecting to the database.
  # 
  # Once a connection is requested using #hold, the connection pool
  # creates new connections to the database.
  def disconnect(opts=OPTS)
    conns = nil
    sync do
      conns = @available_connections.dup
      @available_connections.clear
      @waiter.signal
    end
    conns.each{|conn| disconnect_connection(conn)}
  end
  
  # Chooses the first available connection, or if none are
  # available, creates a new connection.  Passes the connection to the supplied
  # block:
  # 
  #   pool.hold {|conn| conn.execute('DROP TABLE posts')}
  # 
  # Pool#hold is re-entrant, meaning it can be called recursively in
  # the same thread without blocking.
  #
  # If no connection is immediately available and the pool is already using the maximum
  # number of connections, Pool#hold will block until a connection
  # is available or the timeout expires.  If the timeout expires before a
  # connection can be acquired, a Sequel::PoolTimeout is raised.
  def hold(server=nil)
    t = Sequel.current
    if conn = owned_connection(t)
      return yield(conn)
    end
    begin
      conn = acquire(t)
      yield conn
    rescue Sequel::DatabaseDisconnectError, *@error_classes => e
      if disconnect_error?(e)
        oconn = conn
        conn = nil
        disconnect_connection(oconn) if oconn
        sync do 
          @allocated.delete(t)
          @waiter.signal
        end
      end
      raise
    ensure
      if conn
        sync{release(t)}
        if @connection_handling == :disconnect
          disconnect_connection(conn)
        end
      end
    end
  end

  def pool_type
    :threaded
  end
  
  # The total number of connections opened, either available or allocated.
  # The calling code should not have the mutex before calling this.
  def size
    @mutex.synchronize{_size}
  end
  
  private

  # The total number of connections opened, either available or allocated.
  # The calling code should already have the mutex before calling this.
  def _size
    @allocated.length + @available_connections.length
  end

  # Assigns a connection to the supplied thread, if one
  # is available. The calling code should NOT already have the mutex when
  # calling this.
  #
  # This should return a connection is one is available within the timeout,
  # or raise PoolTimeout if a connection could not be acquired within the timeout.
  def acquire(thread)
    if conn = assign_connection(thread)
      return conn
    end

    timeout = @timeout
    timer = Sequel.start_timer

    sync do
      @waiter.wait(@mutex, timeout)
      if conn = next_available
        return(@allocated[thread] = conn)
      end
    end

    until conn = assign_connection(thread)
      elapsed = Sequel.elapsed_seconds_since(timer)
      # :nocov:
      raise_pool_timeout(elapsed) if elapsed > timeout

      # It's difficult to get to this point, it can only happen if there is a race condition
      # where a connection cannot be acquired even after the thread is signalled by the condition variable
      sync do
        @waiter.wait(@mutex, timeout - elapsed)
        if conn = next_available
          return(@allocated[thread] = conn)
        end
      end
      # :nocov:
    end

    conn
  end

  # Assign a connection to the thread, or return nil if one cannot be assigned.
  # The caller should NOT have the mutex before calling this.
  def assign_connection(thread)
    # Thread safe as instance variable is only assigned to local variable
    # and not operated on outside mutex.
    allocated = @allocated
    do_make_new = false
    to_disconnect = nil

    sync do
      if conn = next_available
        return(allocated[thread] = conn)
      end

      if (n = _size) >= (max = @max_size)
        allocated.keys.each do |t|
          unless t.alive?
            (to_disconnect ||= []) << allocated.delete(t)
          end
        end
        n = nil
      end

      if (n || _size) < max
        do_make_new = allocated[thread] = true
      end
    end

    if to_disconnect
      to_disconnect.each{|dconn| disconnect_connection(dconn)}
    end

    # Connect to the database outside of the connection pool mutex,
    # as that can take a long time and the connection pool mutex
    # shouldn't be locked while the connection takes place.
    if do_make_new
      begin
        conn = make_new(:default)
        sync{allocated[thread] = conn}
      ensure
        unless conn
          sync{allocated.delete(thread)}
        end
      end
    end

    conn
  end

  # Return a connection to the pool of available connections, returns the connection.
  # The calling code should already have the mutex before calling this.
  def checkin_connection(conn)
    @available_connections << conn
    conn
  end

  # Return the next available connection in the pool, or nil if there
  # is not currently an available connection.  The calling code should already
  # have the mutex before calling this.
  def next_available
    case @connection_handling
    when :stack
      @available_connections.pop
    else
      @available_connections.shift
    end
  end

  # Returns the connection owned by the supplied thread,
  # if any. The calling code should NOT already have the mutex before calling this.
  def owned_connection(thread)
    sync{@allocated[thread]}
  end
  
  # Create the maximum number of connections immediately.  The calling code should
  # NOT have the mutex before calling this.
  def preconnect(concurrent = false)
    enum = (max_size - _size).times

    conns = if concurrent
      enum.map{Thread.new{make_new(:default)}}.map(&:value)
    else
      enum.map{make_new(:default)}
    end

    sync{conns.each{|conn| checkin_connection(conn)}}
  end

  # Raise a PoolTimeout error showing the current timeout, the elapsed time, and the
  # database's name (if any).
  def raise_pool_timeout(elapsed)
    name = db.opts[:name]
    raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
  end
  
  # Releases the connection assigned to the supplied thread back to the pool.
  # The calling code should already have the mutex before calling this.
  def release(thread)
    conn = @allocated.delete(thread)

    unless @connection_handling == :disconnect
      checkin_connection(conn)
    end

    @waiter.signal
    nil
  end

  # Yield to the block while inside the mutex. The calling code should NOT
  # already have the mutex before calling this.
  def sync
    @mutex.synchronize{yield}
  end
end