File: client.rb

package info (click to toggle)
ruby-redis-clustering 5.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 100 kB
  • sloc: ruby: 256; makefile: 7
file content (139 lines) | stat: -rw-r--r-- 4,168 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# frozen_string_literal: true

require 'redis-cluster-client'
require 'redis/cluster/transaction_adapter'

class Redis
  class Cluster
    class Client < RedisClient::Cluster
      ERROR_MAPPING = ::Redis::Client::ERROR_MAPPING.merge(
        RedisClient::Cluster::InitialSetupError => Redis::Cluster::InitialSetupError,
        RedisClient::Cluster::OrchestrationCommandNotSupported => Redis::Cluster::OrchestrationCommandNotSupported,
        RedisClient::Cluster::AmbiguousNodeError => Redis::Cluster::AmbiguousNodeError,
        RedisClient::Cluster::ErrorCollection => Redis::Cluster::CommandErrorCollection,
        RedisClient::Cluster::Transaction::ConsistencyError => Redis::Cluster::TransactionConsistencyError,
        RedisClient::Cluster::NodeMightBeDown => Redis::Cluster::NodeMightBeDown,
      )

      class << self
        def config(**kwargs)
          super(protocol: 2, **kwargs)
        end

        def sentinel(**kwargs)
          super(protocol: 2, **kwargs)
        end

        def translate_error!(error, mapping: ERROR_MAPPING)
          case error
          when RedisClient::Cluster::ErrorCollection
            error.errors.each do |_node, node_error|
              if node_error.is_a?(RedisClient::AuthenticationError)
                raise mapping.fetch(node_error.class), node_error.message, node_error.backtrace
              end
            end

            remapped_node_errors = error.errors.map do |node_key, node_error|
              remapped = mapping.fetch(node_error.class, node_error.class).new(node_error.message)
              remapped.set_backtrace node_error.backtrace
              [node_key, remapped]
            end.to_h

            raise(Redis::Cluster::CommandErrorCollection.new(remapped_node_errors, error.message).tap do |remapped|
              remapped.set_backtrace error.backtrace
            end)
          else
            Redis::Client.translate_error!(error, mapping: mapping)
          end
        end
      end

      def initialize(*)
        handle_errors { super }
      end
      ruby2_keywords :initialize if respond_to?(:ruby2_keywords, true)

      def id
        server_url.join(' ')
      end

      def server_url
        @router.nil? ? @config.startup_nodes.keys : router.node_keys
      end

      def connected?
        true
      end

      def disable_reconnection
        yield # TODO: do we need this, is it doable?
      end

      def timeout
        config.read_timeout
      end

      def db
        0
      end

      undef_method :call
      undef_method :call_once
      undef_method :call_once_v
      undef_method :blocking_call

      def call_v(command, &block)
        handle_errors { super(command, &block) }
      end

      def blocking_call_v(timeout, command, &block)
        timeout += self.timeout if timeout && timeout > 0
        handle_errors { super(timeout, command, &block) }
      end

      def pipelined(exception: true, &block)
        handle_errors { super(exception: exception, &block) }
      end

      def multi(watch: nil, &block)
        handle_errors { super(watch: watch, &block) }
      end

      def watch(*keys, &block)
        unless block_given?
          raise(
            Redis::Cluster::TransactionConsistencyError,
            'A block is required if you use the cluster client.'
          )
        end

        unless block.arity == 1
          raise(
            Redis::Cluster::TransactionConsistencyError,
            'Given block needs an argument if you use the cluster client.'
          )
        end

        handle_errors do
          RedisClient::Cluster::OptimisticLocking.new(router).watch(keys) do |c, slot, asking|
            transaction = Redis::Cluster::TransactionAdapter.new(
              self, router, @command_builder, node: c, slot: slot, asking: asking
            )

            result = yield transaction
            c.call('UNWATCH') unless transaction.lock_released?
            result
          end
        end
      end

      private

      def handle_errors
        yield
      rescue ::RedisClient::Error => error
        Redis::Cluster::Client.translate_error!(error)
      end
    end
  end
end