File: pipeline.rb

package info (click to toggle)
ruby-redis-cluster-client 0.10.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: ruby: 2,214; makefile: 4
file content (264 lines) | stat: -rw-r--r-- 8,769 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/connection_mixin'
require 'redis_client/middlewares'
require 'redis_client/pooled'

class RedisClient
  class Cluster
    class Pipeline
      class Extended < ::RedisClient::Pipeline
        attr_reader :outer_indices

        def initialize(...)
          super
          @outer_indices = nil
        end

        def add_outer_index(index)
          @outer_indices ||= []
          @outer_indices << index
        end

        def get_inner_index(outer_index)
          @outer_indices&.find_index(outer_index)
        end

        def get_callee_method(inner_index)
          if @timeouts.is_a?(Array) && !@timeouts[inner_index].nil?
            :blocking_call_v
          elsif _retryable?
            :call_once_v
          else
            :call_v
          end
        end

        def get_command(inner_index)
          @commands.is_a?(Array) ? @commands[inner_index] : nil
        end

        def get_timeout(inner_index)
          @timeouts.is_a?(Array) ? @timeouts[inner_index] : nil
        end

        def get_block(inner_index)
          @blocks.is_a?(Array) ? @blocks[inner_index] : nil
        end
      end

      ::RedisClient::ConnectionMixin.module_eval do
        def call_pipelined_aware_of_redirection(commands, timeouts, exception:) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
          size = commands.size
          results = Array.new(commands.size)
          @pending_reads += size
          write_multi(commands)

          redirection_indices = nil
          first_exception = nil
          size.times do |index|
            timeout = timeouts && timeouts[index]
            result = read(timeout)
            @pending_reads -= 1
            if result.is_a?(::RedisClient::Error)
              result._set_command(commands[index])
              if result.is_a?(::RedisClient::CommandError) && result.message.start_with?('MOVED', 'ASK')
                redirection_indices ||= []
                redirection_indices << index
              elsif exception
                first_exception ||= result
              end
            end
            results[index] = result
          end

          raise first_exception if exception && first_exception
          return results if redirection_indices.nil?

          err = ::RedisClient::Cluster::Pipeline::RedirectionNeeded.new
          err.replies = results
          err.indices = redirection_indices
          raise err
        end
      end

      ::RedisClient.class_eval do
        attr_reader :middlewares

        def ensure_connected_cluster_scoped(retryable: true, &block)
          ensure_connected(retryable: retryable, &block)
        end
      end

      ReplySizeError = Class.new(::RedisClient::Error)

      class RedirectionNeeded < ::RedisClient::Error
        attr_accessor :replies, :indices
      end

      def initialize(router, command_builder, concurrent_worker, exception:, seed: Random.new_seed)
        @router = router
        @command_builder = command_builder
        @concurrent_worker = concurrent_worker
        @exception = exception
        @seed = seed
        @pipelines = nil
        @size = 0
      end

      def call(*args, **kwargs, &block)
        command = @command_builder.generate(args, kwargs)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).call_v(command, &block)
      end

      def call_v(args, &block)
        command = @command_builder.generate(args)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).call_v(command, &block)
      end

      def call_once(*args, **kwargs, &block)
        command = @command_builder.generate(args, kwargs)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).call_once_v(command, &block)
      end

      def call_once_v(args, &block)
        command = @command_builder.generate(args)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).call_once_v(command, &block)
      end

      def blocking_call(timeout, *args, **kwargs, &block)
        command = @command_builder.generate(args, kwargs)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).blocking_call_v(timeout, command, &block)
      end

      def blocking_call_v(timeout, args, &block)
        command = @command_builder.generate(args)
        node_key = @router.find_node_key(command, seed: @seed)
        append_pipeline(node_key).blocking_call_v(timeout, command, &block)
      end

      def empty?
        @size.zero?
      end

      def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
        return if @pipelines.nil? || @pipelines.empty?

        work_group = @concurrent_worker.new_group(size: @pipelines.size)

        @pipelines.each do |node_key, pipeline|
          work_group.push(node_key, @router.find_node(node_key), pipeline) do |cli, pl|
            replies = do_pipelining(cli, pl)
            raise ReplySizeError, "commands: #{pl._size}, replies: #{replies.size}" if pl._size != replies.size

            replies
          end
        end

        all_replies = errors = required_redirections = nil

        work_group.each do |node_key, v|
          case v
          when ::RedisClient::Cluster::Pipeline::RedirectionNeeded
            required_redirections ||= {}
            required_redirections[node_key] = v
          when StandardError
            errors ||= {}
            errors[node_key] = v
          else
            all_replies ||= Array.new(@size)
            @pipelines[node_key].outer_indices.each_with_index { |outer, inner| all_replies[outer] = v[inner] }
          end
        end

        work_group.close
        raise ::RedisClient::Cluster::ErrorCollection, errors unless errors.nil?

        required_redirections&.each do |node_key, v|
          all_replies ||= Array.new(@size)
          pipeline = @pipelines[node_key]
          v.indices.each { |i| v.replies[i] = handle_redirection(v.replies[i], pipeline, i) }
          pipeline.outer_indices.each_with_index { |outer, inner| all_replies[outer] = v.replies[inner] }
        end

        all_replies
      end

      private

      def append_pipeline(node_key)
        @pipelines ||= {}
        @pipelines[node_key] ||= ::RedisClient::Cluster::Pipeline::Extended.new(@command_builder)
        @pipelines[node_key].add_outer_index(@size)
        @size += 1
        @pipelines[node_key]
      end

      def do_pipelining(client, pipeline)
        case client
        when ::RedisClient then send_pipeline(client, pipeline)
        when ::RedisClient::Pooled then client.with { |cli| send_pipeline(cli, pipeline) }
        else raise NotImplementedError, "#{client.class.name}#pipelined for cluster client"
        end
      end

      def send_pipeline(client, pipeline)
        results = client.ensure_connected_cluster_scoped(retryable: pipeline._retryable?) do |connection|
          commands = pipeline._commands
          client.middlewares.call_pipelined(commands, client.config) do
            connection.call_pipelined_aware_of_redirection(commands, pipeline._timeouts, exception: @exception)
          end
        end

        pipeline._coerce!(results)
      end

      def handle_redirection(err, pipeline, inner_index)
        return err unless err.is_a?(::RedisClient::CommandError)

        if err.message.start_with?('MOVED')
          node = @router.assign_redirection_node(err.message)
          try_redirection(node, pipeline, inner_index)
        elsif err.message.start_with?('ASK')
          node = @router.assign_asking_node(err.message)
          try_asking(node) ? try_redirection(node, pipeline, inner_index) : err
        else
          err
        end
      end

      def try_redirection(node, pipeline, inner_index)
        redirect_command(node, pipeline, inner_index)
      rescue StandardError => e
        @exception ? raise : e
      end

      def redirect_command(node, pipeline, inner_index)
        method = pipeline.get_callee_method(inner_index)
        command = pipeline.get_command(inner_index)
        timeout = pipeline.get_timeout(inner_index)
        block = pipeline.get_block(inner_index)
        args = timeout.nil? ? [] : [timeout]

        if block.nil?
          @router.try_send(node, method, command, args)
        else
          @router.try_send(node, method, command, args, &block)
        end
      end

      def try_asking(node)
        node.call('ASKING') == 'OK'
      rescue StandardError
        false
      end
    end
  end
end