1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# frozen-string-literal: true
#
# The temporarily_release_connection extension adds support for temporarily
# releasing a checked out connection back to the connection pool. It is
# designed for use in multithreaded transactional integration tests, allowing
# a connection to start a transaction in one thread, but be temporarily
# released back to the connection pool, so it can be operated on safely
# by multiple threads inside a block. For example, the main thread could be
# running tests that send web requests, and a separate thread running a web
# server that is responding to those requests, and the same connection and
# transaction would be used for both.
#
# To load the extension into the database:
#
# DB.extension :temporarily_release_connection
#
# After the extension is loaded, call the +temporarily_release_connection+
# method with the connection object to temporarily release the connection
# back to the pool. Example:
#
# DB.transaction(rollback: :always, auto_savepoint: true) do |conn|
# DB.temporarily_release_connection(conn) do
# # Other threads can operate on connection safely inside the transaction
# yield
# end
# end
#
# For sharded connection pools, the second argument to +temporarily_release_connection+
# is respected, and specifies the server on which to temporarily release the connection.
#
# The temporarily_release_connection extension is only supported with the
# threaded and timed_queue connection pools that ship with Sequel (and the sharded
# versions of each). To make sure that same connection object can be reacquired, it
# is only supported if the maximum connection pool size is 1, so set the Database
# :max_connections option to 1 if you plan to use this extension.
#
# If the +temporarily_release_connection+ method cannot reacquire the same connection
# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError
# exception. This should only happen if the connection has been disconnected
# while it was temporarily released. If this error is raised, Database#transaction
# will not rollback the transaction, since the connection object is likely no longer
# valid, and on poorly written database drivers, that could cause the process to crash.
#
# Related modules: Sequel::TemporarilyReleaseConnection,
# Sequel::UnableToReacquireConnectionError
#
module Sequel
# Error class raised if the connection pool does not provide the same connection
# object when checking a temporarily released connection out.
class UnableToReacquireConnectionError < Error
end
module TemporarilyReleaseConnection
module DatabaseMethods
# Temporarily release the connection back to the connection pool for the
# duration of the block.
def temporarily_release_connection(conn, server=:default, &block)
pool.temporarily_release_connection(conn, server, &block)
end
private
# Do nothing if UnableToReacquireConnectionError is raised, as it is
# likely the connection is not in a usable state.
def rollback_transaction(conn, opts)
return if UnableToReacquireConnectionError === $!
super
end
end
module PoolMethods
# Temporarily release a currently checked out connection, then yield to the block. Reacquire the same
# connection upon the exit of the block.
def temporarily_release_connection(conn, server)
t = Sequel.current
raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server))
begin
trc_release(t, conn, server)
yield
ensure
c = trc_acquire(t, server)
unless conn.equal?(c)
raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection"
end
end
end
end
module TimedQueue
private
def trc_owned_connection(t, server)
owned_connection(t)
end
def trc_release(t, conn, server)
release(t)
end
def trc_acquire(t, server)
acquire(t)
end
end
module ShardedTimedQueue
# Normalize the server name for sharded connection pools
def temporarily_release_connection(conn, server)
server = pick_server(server)
super
end
private
def trc_owned_connection(t, server)
owned_connection(t, server)
end
def trc_release(t, conn, server)
release(t, conn, server)
end
def trc_acquire(t, server)
acquire(t, server)
end
end
module ThreadedBase
private
def trc_release(t, conn, server)
sync{super}
end
end
module Threaded
include TimedQueue
include ThreadedBase
end
module ShardedThreaded
include ShardedTimedQueue
include ThreadedBase
end
end
trc = TemporarilyReleaseConnection
trc_map = {
:threaded => trc::Threaded,
:sharded_threaded => trc::ShardedThreaded,
:timed_queue => trc::TimedQueue,
:sharded_timed_queue => trc::ShardedTimedQueue,
}.freeze
Database.register_extension(:temporarily_release_connection) do |db|
unless pool_mod = trc_map[db.pool.pool_type]
raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}")
end
case db.pool.pool_type
when :threaded, :sharded_threaded
if db.opts[:connection_handling] == :disconnect
raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option"
end
end
unless db.pool.max_size == 1
raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1"
end
db.extend(trc::DatabaseMethods)
db.pool.extend(trc::PoolMethods)
db.pool.extend(pool_mod)
end
private_constant :TemporarilyReleaseConnection
end
|