1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
# frozen-string-literal: true
#
# The server_block extension adds the Database#with_server method, which takes a shard
# argument and a block, and makes it so that access inside the block will use the
# specified shard by default.
#
# First, you need to enable it on the database object:
#
# DB.extension :server_block
#
# Then you can call with_server:
#
# DB.with_server(:shard1) do
# DB[:a].all # Uses shard1
# DB[:a].server(:shard2).all # Uses shard2
# end
# DB[:a].all # Uses default
#
# You can nest calls to with_server:
#
# DB.with_server(:shard1) do
# DB[:a].all # Uses shard1
# DB.with_server(:shard2) do
# DB[:a].all # Uses shard2
# end
# DB[:a].all # Uses shard1
# end
# DB[:a].all # Uses default
#
# Note that if you pass the nil, :default, or :read_only server/shard
# names to Dataset#server inside a with_server block, they will be
# ignored and the server/shard given to with_server will be used:
#
# DB.with_server(:shard1) do
# DB[:a].all # Uses shard1
# DB[:a].server(:shard2).all # Uses shard2
# DB[:a].server(nil).all # Uses shard1
# DB[:a].server(:default).all # Uses shard1
# DB[:a].server(:read_only).all # Uses shard1
# end
#
# If you pass two separate shards to with_server, the second shard will
# be used instead of the :read_only shard, and the first shard will be
# used instead of the :default shard:
#
# DB.with_server(:shard1, :shard2) do
# DB[:a].all # Uses shard2
# DB[:a].delete # Uses shard1
# DB[:a].server(:shard3).all # Uses shard3
# DB[:a].server(:shard3).delete # Uses shard3
# DB[:a].server(:default).all # Uses shard1
# DB[:a].server(:read_only).delete # Uses shard2
# end
#
# If you use an invalid server when calling with_server, it will be
# treated the same way as if you called Dataset#server with an invalid
# server. By default, the default server will be used in such cases.
# If you would like a different server to be used, or an exception to
# be raised, then use the :servers_hash Database option.
#
# Related modules: Sequel::ServerBlock, Sequel::UnthreadedServerBlock,
# Sequel::ThreadedServerBlock
#
module Sequel
module ServerBlock
# Enable the server block on the connection pool, choosing the correct
# extension depending on whether the connection pool is threaded or not.
# Also defines the with_server method on the receiver for easy use.
def self.extended(db)
pool = db.pool
if defined?(ShardedThreadedConnectionPool) && pool.is_a?(ShardedThreadedConnectionPool)
pool.extend(ThreadedServerBlock)
pool.instance_variable_set(:@default_servers, {})
else
pool.extend(UnthreadedServerBlock)
pool.instance_variable_set(:@default_servers, [])
end
end
# Delegate to the connection pool
def with_server(default_server, read_only_server=default_server, &block)
pool.with_server(default_server, read_only_server, &block)
end
end
# Adds with_server support for the sharded single connection pool.
module UnthreadedServerBlock
# Set a default server/shard to use inside the block.
def with_server(default_server, read_only_server=default_server)
set_default_server(default_server, read_only_server)
yield
ensure
clear_default_server
end
private
# Make the given server the new default server.
def set_default_server(default_server, read_only_server=default_server)
@default_servers << [default_server, read_only_server]
end
# Remove the current default server, restoring the
# previous default server.
def clear_default_server
@default_servers.pop
end
# Use the server given to with_server if appropriate.
def pick_server(server)
if @default_servers.empty?
super
else
case server
when :default, nil
@servers[@default_servers[-1][0]]
when :read_only
@servers[@default_servers[-1][1]]
else
super
end
end
end
end
# Adds with_server support for the sharded threaded connection pool.
module ThreadedServerBlock
# Set a default server/shard to use inside the block for the current
# thread.
def with_server(default_server, read_only_server=default_server)
set_default_server(default_server, read_only_server)
yield
ensure
clear_default_server
end
private
# Make the given server the new default server for the current thread.
def set_default_server(default_server, read_only_server=default_server)
sync{(@default_servers[Sequel.current] ||= [])} << [default_server, read_only_server]
end
# Remove the current default server for the current thread, restoring the
# previous default server.
def clear_default_server
t = Sequel.current
a = sync{@default_servers[t]}
a.pop
sync{@default_servers.delete(t)} if a.empty?
end
# Use the server given to with_server for the given thread, if appropriate.
def pick_server(server)
a = sync{@default_servers[Sequel.current]}
if !a || a.empty?
super
else
# Hash handling required to work when loaded after arbitrary servers plugin.
case server
when :default, nil
v = a[-1][0]
v = @servers[v] unless v.is_a?(Hash)
v
when :read_only
v = a[-1][1]
v = @servers[v] unless v.is_a?(Hash)
v
else
super
end
end
end
end
Database.register_extension(:server_block, ServerBlock)
end
|