File: temporarily_release_connection.rb

package info (click to toggle)
ruby-sequel 5.97.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,188 kB
  • sloc: ruby: 123,115; makefile: 3
file content (178 lines) | stat: -rw-r--r-- 5,955 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# frozen-string-literal: true
#
# The temporarily_release_connection extension adds support for temporarily
# releasing a checked out connection back to the connection pool.  It is
# designed for use in multithreaded transactional integration tests, allowing
# a connection to start a transaction in one thread, but be temporarily
# released back to the connection pool, so it can be operated on safely
# by multiple threads inside a block. For example, the main thread could be
# running tests that send web requests, and a separate thread running a web
# server that is responding to those requests, and the same connection and
# transaction would be used for both.
#
# To load the extension into the database:
#
#   DB.extension :temporarily_release_connection
#
# After the extension is loaded, call the +temporarily_release_connection+
# method with the connection object to temporarily release the connection
# back to the pool. Example:
#
#   DB.transaction(rollback: :always, auto_savepoint: true) do |conn|
#     DB.temporarily_release_connection(conn) do
#       # Other threads can operate on connection safely inside the transaction
#       yield
#     end
#   end
#
# For sharded connection pools, the second argument to +temporarily_release_connection+
# is respected, and specifies the server on which to temporarily release the connection.
#
# The temporarily_release_connection extension is only supported with the
# threaded and timed_queue connection pools that ship with Sequel (and the sharded
# versions of each).  To make sure that same connection object can be reacquired, it
# is only supported if the maximum connection pool size is 1, so set the Database
# :max_connections option to 1 if you plan to use this extension.
#
# If the +temporarily_release_connection+ method cannot reacquire the same connection
# it released to the pool, it will raise a Sequel::UnableToReacquireConnectionError
# exception.  This should only happen if the connection has been disconnected
# while it was temporarily released.  If this error is raised, Database#transaction
# will not rollback the transaction, since the connection object is likely no longer
# valid, and on poorly written database drivers, that could cause the process to crash.
#
# Related modules: Sequel::TemporarilyReleaseConnection,
# Sequel::UnableToReacquireConnectionError

#
module Sequel
  # Error class raised if the connection pool does not provide the same connection
  # object when checking a temporarily released connection out.
  class UnableToReacquireConnectionError < Error
  end

  module TemporarilyReleaseConnection
    module DatabaseMethods
      # Temporarily release the connection back to the connection pool for the
      # duration of the block.
      def temporarily_release_connection(conn, server=:default, &block)
        pool.temporarily_release_connection(conn, server, &block)
      end

      private

      # Do nothing if UnableToReacquireConnectionError is raised, as it is
      # likely the connection is not in a usable state.
      def rollback_transaction(conn, opts)
        return if UnableToReacquireConnectionError === $!
        super
      end
    end

    module PoolMethods
      # Temporarily release a currently checked out connection, then yield to the block. Reacquire the same
      # connection upon the exit of the block.
      def temporarily_release_connection(conn, server)
        t = Sequel.current
        raise Error, "connection not currently checked out" unless conn.equal?(trc_owned_connection(t, server))

        begin
          trc_release(t, conn, server)
          yield
        ensure
          c = trc_acquire(t, server)
          unless conn.equal?(c)
            raise UnableToReacquireConnectionError, "reacquired connection not the same as initial connection"
          end
        end
      end
    end

    module TimedQueue
      private

      def trc_owned_connection(t, server)
        owned_connection(t)
      end

      def trc_release(t, conn, server)
        release(t)
      end

      def trc_acquire(t, server)
        acquire(t)
      end
    end

    module ShardedTimedQueue
      # Normalize the server name for sharded connection pools
      def temporarily_release_connection(conn, server)
        server = pick_server(server)
        super
      end

      private

      def trc_owned_connection(t, server)
        owned_connection(t, server)
      end

      def trc_release(t, conn, server)
        release(t, conn, server)
      end

      def trc_acquire(t, server)
        acquire(t, server)
      end
    end

    module ThreadedBase
      private

      def trc_release(t, conn, server)
        sync{super}
      end
    end

    module Threaded
      include TimedQueue
      include ThreadedBase
    end

    module ShardedThreaded
      include ShardedTimedQueue
      include ThreadedBase
    end
  end

  trc = TemporarilyReleaseConnection
  trc_map = {
    :threaded => trc::Threaded,
    :sharded_threaded => trc::ShardedThreaded,
    :timed_queue => trc::TimedQueue,
    :sharded_timed_queue => trc::ShardedTimedQueue,
  }.freeze

  Database.register_extension(:temporarily_release_connection) do |db|
    unless pool_mod = trc_map[db.pool.pool_type]
      raise(Error, "temporarily_release_connection extension not supported for connection pool type #{db.pool.pool_type}")
    end

    case db.pool.pool_type
    when :threaded, :sharded_threaded
      if db.opts[:connection_handling] == :disconnect
        raise Error, "temporarily_release_connection extension not supported with connection_handling: :disconnect option"
      end
    end

    unless db.pool.max_size == 1
      raise Error, "temporarily_release_connection extension not supported unless :max_connections option is 1"
    end

    db.extend(trc::DatabaseMethods)
    db.pool.extend(trc::PoolMethods)
    db.pool.extend(pool_mod)
  end

  private_constant :TemporarilyReleaseConnection
end