File: threaded.rb

package info (click to toggle)
ruby-sequel 4.37.0-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 8,076 kB
  • ctags: 5,629
  • sloc: ruby: 91,441; makefile: 2
file content (286 lines) | stat: -rw-r--r-- 9,525 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# frozen-string-literal: true

# A connection pool allowing multi-threaded access to a pool of connections.
# This is the default connection pool used by Sequel.
class Sequel::ThreadedConnectionPool < Sequel::ConnectionPool
  # Whether or not a ConditionVariable should be used to wait for connections.
  # True except on ruby 1.8, where ConditionVariable#wait does not support a
  # timeout.
  unless defined?(USE_WAITER)
    USE_WAITER = RUBY_VERSION >= '1.9'
  end

  # The maximum number of connections this pool will create (per shard/server
  # if sharding).
  attr_reader :max_size
  
  # An array of connections that are available for use by the pool.
  attr_reader :available_connections
  
  # A hash with thread keys and connection values for currently allocated
  # connections.
  attr_reader :allocated

  # The following additional options are respected:
  # * :connection_handling - Set how to handle available connections.  By default,
  #   uses a a queue for fairness.  Can be set to :stack to use a stack, which may
  #   offer better performance.
  # * :max_connections - The maximum number of connections the connection pool
  #   will open (default 4)
  # * :pool_sleep_time - The amount of time to sleep before attempting to acquire
  #   a connection again, only used on ruby 1.8. (default 0.001)
  # * :pool_timeout - The amount of seconds to wait to acquire a connection
  #   before raising a PoolTimeoutError (default 5)
  def initialize(db, opts = OPTS)
    super
    @max_size = Integer(opts[:max_connections] || 4)
    raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
    @mutex = Mutex.new  
    @connection_handling = opts[:connection_handling]
    @available_connections = []
    @allocated = {}
    @timeout = Float(opts[:pool_timeout] || 5)

    if USE_WAITER
      @waiter = ConditionVariable.new
    else
      # :nocov:
      @sleep_time = Float(opts[:pool_sleep_time] || 0.001)
      # :nocov:
    end
  end
  
  # Yield all of the available connections, and the one currently allocated to
  # this thread.  This will not yield connections currently allocated to other
  # threads, as it is not safe to operate on them.  This holds the mutex while
  # it is yielding all of the available connections, which means that until
  # the method's block returns, the pool is locked.
  def all_connections
    hold do |c|
      sync do
        yield c
        @available_connections.each{|conn| yield conn}
      end
    end
  end
  
  # Removes all connections currently available, optionally
  # yielding each connection to the given block. This method has the effect of 
  # disconnecting from the database, assuming that no connections are currently
  # being used.  If you want to be able to disconnect connections that are
  # currently in use, use the ShardedThreadedConnectionPool, which can do that.
  # This connection pool does not, for performance reasons. To use the sharded pool,
  # pass the <tt>:servers=>{}</tt> option when connecting to the database.
  # 
  # Once a connection is requested using #hold, the connection pool
  # creates new connections to the database.
  def disconnect(opts=OPTS)
    conns = nil
    sync do
      conns = @available_connections.dup
      @available_connections.clear
    end
    conns.each{|conn| disconnect_connection(conn)}
  end
  
  # Chooses the first available connection, or if none are
  # available, creates a new connection.  Passes the connection to the supplied
  # block:
  # 
  #   pool.hold {|conn| conn.execute('DROP TABLE posts')}
  # 
  # Pool#hold is re-entrant, meaning it can be called recursively in
  # the same thread without blocking.
  #
  # If no connection is immediately available and the pool is already using the maximum
  # number of connections, Pool#hold will block until a connection
  # is available or the timeout expires.  If the timeout expires before a
  # connection can be acquired, a Sequel::PoolTimeout is 
  # raised.
  def hold(server=nil)
    t = Thread.current
    if conn = owned_connection(t)
      return yield(conn)
    end
    begin
      conn = acquire(t)
      yield conn
    rescue Sequel::DatabaseDisconnectError, *@error_classes => e
      if disconnect_error?(e)
        oconn = conn
        conn = nil
        disconnect_connection(oconn) if oconn
        @allocated.delete(t)
      end
      raise
    ensure
      sync{release(t)} if conn
    end
  end

  def pool_type
    :threaded
  end
  
  # The total number of connections opened, either available or allocated.
  # This may not be completely accurate as it isn't protected by the mutex.
  def size
    @allocated.length + @available_connections.length
  end
  
  private

  # Assigns a connection to the supplied thread, if one
  # is available. The calling code should already have the mutex when
  # calling this.
  def _acquire(thread)
    if conn = available
      @allocated[thread] = conn
    end
  end
  
  if USE_WAITER
    # Assigns a connection to the supplied thread, if one
    # is available. The calling code should NOT already have the mutex when
    # calling this.
    #
    # This should return a connection is one is available within the timeout,
    # or nil if a connection could not be acquired within the timeout.
    def acquire(thread)
      sync do
        if conn = _acquire(thread)
          return conn
        end

        time = Time.now
        @waiter.wait(@mutex, @timeout)

        # Not sure why this is helpful, but calling Thread.pass after conditional
        # variable access dramatically increases reliability when under heavy
        # resource contention (almost eliminating timeouts), at a small cost to
        # runtime performance.
        Thread.pass

        until conn = _acquire(thread)
          deadline ||= time + @timeout
          current_time = Time.now
          raise_pool_timeout(current_time - time) if current_time > deadline
          # :nocov:
          # It's difficult to get to this point, it can only happen if there is a race condition
          # where a connection cannot be acquired even after the thread is signalled by the condition
          @waiter.wait(@mutex, deadline - current_time)
          Thread.pass
          # :nocov:
        end

        conn
      end
    end
  else
    # :nocov:
    def acquire(thread)
      unless conn = sync{_acquire(thread)}
        time = Time.now
        timeout = time + @timeout
        sleep_time = @sleep_time
        sleep sleep_time
        until conn = sync{_acquire(thread)}
          raise_pool_timeout(Time.now - time) if Time.now > timeout
          sleep sleep_time
        end
      end
      conn
    end
    # :nocov:
  end

  # Returns an available connection. If no connection is
  # available, tries to create a new connection. The calling code should already
  # have the mutex before calling this.
  def available
    next_available || make_new(DEFAULT_SERVER)
  end

  # Return a connection to the pool of available connections, returns the connection.
  # The calling code should already have the mutex before calling this.
  def checkin_connection(conn)
    @available_connections << conn
    if USE_WAITER
      @waiter.signal
      Thread.pass
    end
    conn
  end

  unless method_defined?(:default_make_new)
    # Alias the default make_new method, so subclasses can call it directly.
    alias default_make_new make_new
  end
  
  # Creates a new connection to the given server if the size of the pool for
  # the server is less than the maximum size of the pool. The calling code
  # should already have the mutex before calling this.
  def make_new(server)
    if (n = size) >= @max_size
      @allocated.keys.each{|t| release(t) unless t.alive?}
      n = nil
    end
    super if (n || size) < @max_size
  end

  # Return the next available connection in the pool, or nil if there
  # is not currently an available connection.  The calling code should already
  # have the mutex before calling this.
  def next_available
    case @connection_handling
    when :stack
      @available_connections.pop
    else
      @available_connections.shift
    end
  end

  # Returns the connection owned by the supplied thread,
  # if any. The calling code should NOT already have the mutex before calling this.
  def owned_connection(thread)
    sync{@allocated[thread]}
  end
  
  # Create the maximum number of connections immediately.
  def preconnect(concurrent = false)
    enum = (max_size - size).times

    if concurrent
      enum.map{Thread.new{make_new(nil)}}.map(&:join).each{|t| checkin_connection(t.value)}
    else
      enum.each{checkin_connection(make_new(nil))}
    end
  end

  # Raise a PoolTimeout error showing the current timeout, the elapsed time, and the
  # database's name (if any).
  def raise_pool_timeout(elapsed)
    name = db.opts[:name]
    raise ::Sequel::PoolTimeout, "timeout: #{@timeout}, elapsed: #{elapsed}#{", database name: #{name}" if name}"
  end
  
  # Releases the connection assigned to the supplied thread back to the pool.
  # The calling code should already have the mutex before calling this.
  def release(thread)
    conn = @allocated.delete(thread)

    if @connection_handling == :disconnect
      disconnect_connection(conn)
    else
      checkin_connection(conn)
    end
  end

  # Yield to the block while inside the mutex. The calling code should NOT
  # already have the mutex before calling this.
  def sync
    @mutex.synchronize{yield}
  end
  
  CONNECTION_POOL_MAP[[false, false]] = self
end