1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
# frozen_string_literal: true
require 'redis-cluster-client'
require 'redis/cluster/transaction_adapter'
class Redis
class Cluster
class Client < RedisClient::Cluster
ERROR_MAPPING = ::Redis::Client::ERROR_MAPPING.merge(
RedisClient::Cluster::InitialSetupError => Redis::Cluster::InitialSetupError,
RedisClient::Cluster::OrchestrationCommandNotSupported => Redis::Cluster::OrchestrationCommandNotSupported,
RedisClient::Cluster::AmbiguousNodeError => Redis::Cluster::AmbiguousNodeError,
RedisClient::Cluster::ErrorCollection => Redis::Cluster::CommandErrorCollection,
RedisClient::Cluster::Transaction::ConsistencyError => Redis::Cluster::TransactionConsistencyError,
RedisClient::Cluster::NodeMightBeDown => Redis::Cluster::NodeMightBeDown,
)
class << self
def config(**kwargs)
super(protocol: 2, **kwargs)
end
def sentinel(**kwargs)
super(protocol: 2, **kwargs)
end
def translate_error!(error, mapping: ERROR_MAPPING)
case error
when RedisClient::Cluster::ErrorCollection
error.errors.each do |_node, node_error|
if node_error.is_a?(RedisClient::AuthenticationError)
raise mapping.fetch(node_error.class), node_error.message, node_error.backtrace
end
end
remapped_node_errors = error.errors.map do |node_key, node_error|
remapped = mapping.fetch(node_error.class, node_error.class).new(node_error.message)
remapped.set_backtrace node_error.backtrace
[node_key, remapped]
end.to_h
raise(Redis::Cluster::CommandErrorCollection.new(remapped_node_errors, error.message).tap do |remapped|
remapped.set_backtrace error.backtrace
end)
else
Redis::Client.translate_error!(error, mapping: mapping)
end
end
end
def initialize(*)
handle_errors { super }
end
ruby2_keywords :initialize if respond_to?(:ruby2_keywords, true)
def id
server_url.join(' ')
end
def server_url
@router.nil? ? @config.startup_nodes.keys : router.node_keys
end
def connected?
true
end
def disable_reconnection
yield # TODO: do we need this, is it doable?
end
def timeout
config.read_timeout
end
def db
0
end
undef_method :call
undef_method :call_once
undef_method :call_once_v
undef_method :blocking_call
def call_v(command, &block)
handle_errors { super(command, &block) }
end
def blocking_call_v(timeout, command, &block)
timeout += self.timeout if timeout && timeout > 0
handle_errors { super(timeout, command, &block) }
end
def pipelined(exception: true, &block)
handle_errors { super(exception: exception, &block) }
end
def multi(watch: nil, &block)
handle_errors { super(watch: watch, &block) }
end
def watch(*keys, &block)
unless block_given?
raise(
Redis::Cluster::TransactionConsistencyError,
'A block is required if you use the cluster client.'
)
end
unless block.arity == 1
raise(
Redis::Cluster::TransactionConsistencyError,
'Given block needs an argument if you use the cluster client.'
)
end
handle_errors do
RedisClient::Cluster::OptimisticLocking.new(router).watch(keys) do |c, slot, asking|
transaction = Redis::Cluster::TransactionAdapter.new(
self, router, @command_builder, node: c, slot: slot, asking: asking
)
result = yield transaction
c.call('UNWATCH') unless transaction.lock_released?
result
end
end
end
private
def handle_errors
yield
rescue ::RedisClient::Error => error
Redis::Cluster::Client.translate_error!(error)
end
end
end
end
|