1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
# frozen_string_literal: true
require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/cluster/pipeline'
class RedisClient
class Cluster
class Transaction
ConsistencyError = Class.new(::RedisClient::Cluster::Error)
MAX_REDIRECTION = 2
EMPTY_ARRAY = [].freeze
private_constant :MAX_REDIRECTION, :EMPTY_ARRAY
def initialize(router, command_builder, node: nil, slot: nil, asking: false)
@router = router
@command_builder = command_builder
@retryable = true
@pipeline = ::RedisClient::Pipeline.new(::RedisClient::Cluster::NoopCommandBuilder)
@pending_commands = []
@node = node
prepare_tx unless @node.nil?
@watching_slot = slot
@asking = asking
end
def call(*command, **kwargs, &block)
command = @command_builder.generate(command, kwargs)
if prepare(command)
@pipeline.call_v(command, &block)
else
defer { @pipeline.call_v(command, &block) }
end
end
def call_v(command, &block)
command = @command_builder.generate(command)
if prepare(command)
@pipeline.call_v(command, &block)
else
defer { @pipeline.call_v(command, &block) }
end
end
def call_once(*command, **kwargs, &block)
@retryable = false
command = @command_builder.generate(command, kwargs)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
defer { @pipeline.call_once_v(command, &block) }
end
end
def call_once_v(command, &block)
@retryable = false
command = @command_builder.generate(command)
if prepare(command)
@pipeline.call_once_v(command, &block)
else
defer { @pipeline.call_once_v(command, &block) }
end
end
def execute
@pending_commands.each(&:call)
return EMPTY_ARRAY if @pipeline._empty?
raise ConsistencyError.new("couldn't determine the node: #{@pipeline._commands}").with_config(@router.config) if @node.nil?
commit
end
private
def defer(&block)
@pending_commands << block
nil
end
def prepare(command)
return true unless @node.nil?
node_key = @router.find_primary_node_key(command)
return false if node_key.nil?
@node = @router.find_node(node_key)
prepare_tx
true
end
def prepare_tx
@pipeline.call('multi')
@pending_commands.each(&:call)
@pending_commands.clear
end
def commit
@pipeline.call('exec')
settle
end
def cancel
@pipeline.call('discard')
settle
end
def settle
# If we needed ASKING on the watch, we need ASKING on the multi as well.
@node.call('asking') if @asking
# Don't handle redirections at this level if we're in a watch (the watcher handles redirections
# at the whole-transaction level.)
send_transaction(@node, redirect: !!@watching_slot ? 0 : MAX_REDIRECTION)
end
def send_transaction(client, redirect:)
case client
when ::RedisClient then send_pipeline(client, redirect: redirect)
when ::RedisClient::Pooled then client.with { |c| send_pipeline(c, redirect: redirect) }
else raise NotImplementedError, "#{client.class.name}#multi for cluster client"
end
end
def send_pipeline(client, redirect:) # rubocop:disable Metrics/AbcSize
replies = client.ensure_connected_cluster_scoped(retryable: @retryable) do |connection|
commands = @pipeline._commands
client.middlewares.call_pipelined(commands, client.config) do
connection.call_pipelined(commands, nil)
rescue ::RedisClient::CommandError => e
ensure_the_same_slot!(commands)
return handle_command_error!(e, redirect: redirect) unless redirect.zero?
raise
end
end
return if replies.last.nil?
coerce_results!(replies.last)
rescue ::RedisClient::ConnectionError
@router.renew_cluster_state if @watching_slot.nil?
raise
end
def coerce_results!(results, offset: 1)
results.each_with_index do |result, index|
if result.is_a?(::RedisClient::CommandError)
result._set_command(@pipeline._commands[index + offset])
raise result
end
next if @pipeline._blocks.nil?
block = @pipeline._blocks[index + offset]
next if block.nil?
results[index] = block.call(result)
end
results
end
def handle_command_error!(err, redirect:) # rubocop:disable Metrics/AbcSize
if err.message.start_with?('CROSSSLOT')
raise ConsistencyError.new("#{err.message}: #{err.command}").with_config(@router.config)
elsif err.message.start_with?('MOVED')
node = @router.assign_redirection_node(err.message)
send_transaction(node, redirect: redirect - 1)
elsif err.message.start_with?('ASK')
node = @router.assign_asking_node(err.message)
try_asking(node) ? send_transaction(node, redirect: redirect - 1) : err
elsif err.message.start_with?('CLUSTERDOWN')
@router.renew_cluster_state if @watching_slot.nil?
raise err
else
raise err
end
end
def ensure_the_same_slot!(commands)
slots = commands.map { |command| @router.find_slot(command) }.compact.uniq
return if slots.size == 1 && @watching_slot.nil?
return if slots.size == 1 && @watching_slot == slots.first
raise ConsistencyError.new("the transaction should be executed to a slot in a node: #{commands}").with_config(@router.config)
end
def try_asking(node)
node.call('asking') == 'OK'
rescue StandardError
false
end
end
end
end
|