File: pub_sub.rb

package info (click to toggle)
ruby-redis-cluster-client 0.13.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 224 kB
  • sloc: ruby: 2,498; makefile: 4
file content (194 lines) | stat: -rw-r--r-- 5,653 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/errors'

class RedisClient
  class Cluster
    class PubSub
      class State
        IO_ERROR_NEVER = { IOError => :never }.freeze
        IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze
        private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE

        def initialize(client, queue)
          @client = client
          @worker = nil
          @queue = queue
        end

        def call(command)
          @client.call_v(command)
        end

        def ensure_worker
          @worker = spawn_worker(@client, @queue) unless @worker&.alive?
        end

        def close
          if @worker&.alive?
            @worker.exit
            @worker.join
          end

          @client.close
        rescue ::RedisClient::ConnectionError
          # ignore
        end

        private

        def spawn_worker(client, queue)
          # Ruby VM allocates 1 MB memory as a stack for a thread.
          # It is a fixed size but we can modify the size with some environment variables.
          # So it consumes memory 1 MB multiplied a number of workers.
          Thread.new(client, queue, nil) do |pubsub, q, prev_err|
            Thread.handle_interrupt(IO_ERROR_NEVER) do
              loop do
                Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << pubsub.next_event }
                prev_err = nil
              rescue StandardError => e
                next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message

                Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << e }
                prev_err = e
              end
            end
          rescue IOError
            # stream closed in another thread
          end
        end
      end

      BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
      private_constant :BUF_SIZE

      def initialize(router, command_builder)
        @router = router
        @command_builder = command_builder
        @queue = SizedQueue.new(BUF_SIZE)
        @state_dict = {}
        @commands = []
      end

      def call(*args, **kwargs)
        command = @command_builder.generate(args, kwargs)
        _call(command)
        @commands << command
        nil
      end

      def call_v(command)
        command = @command_builder.generate(command)
        _call(command)
        @commands << command
        nil
      end

      def close
        @state_dict.each_value(&:close)
        @state_dict.clear
        @commands.clear
        @queue.clear
        @queue.close
        nil
      end

      def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
        @state_dict.each_value(&:ensure_worker)
        max_duration = calc_max_duration(timeout)
        starting = obtain_current_time

        loop do
          break if max_duration > 0 && obtain_current_time - starting > max_duration

          case event = @queue.pop(true)
          when ::RedisClient::CommandError
            raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN')

            break start_over
          when ::RedisClient::ConnectionError then break start_over
          when StandardError then raise event
          when Array then break event
          end
        rescue ThreadError
          sleep 0.005
        end
      end

      private

      def _call(command) # rubocop:disable Metrics/AbcSize
        if command.first.casecmp('subscribe').zero?
          call_to_single_state(command)
        elsif command.first.casecmp('psubscribe').zero?
          call_to_single_state(command)
        elsif command.first.casecmp('ssubscribe').zero?
          call_to_single_state(command)
        elsif command.first.casecmp('unsubscribe').zero?
          call_to_all_states(command)
        elsif command.first.casecmp('punsubscribe').zero?
          call_to_all_states(command)
        elsif command.first.casecmp('sunsubscribe').zero?
          call_for_sharded_states(command)
        else
          call_to_single_state(command)
        end
      end

      def call_to_single_state(command)
        node_key = @router.find_node_key(command)

        handle_connection_error(node_key) do
          @state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
          @state_dict[node_key].call(command)
        end
      end

      def call_to_all_states(command)
        @state_dict.each do |node_key, state|
          handle_connection_error(node_key, ignore: true) do
            state.call(command)
          end
        end
      end

      def call_for_sharded_states(command)
        if command.size == 1
          call_to_all_states(command)
        else
          call_to_single_state(command)
        end
      end

      def obtain_current_time
        Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
      end

      def calc_max_duration(timeout)
        timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
      end

      def handle_connection_error(node_key, ignore: false)
        yield
      rescue ::RedisClient::ConnectionError
        @state_dict[node_key]&.close
        @state_dict.delete(node_key)
        @router.renew_cluster_state
        raise unless ignore
      end

      def start_over
        loop do
          @router.renew_cluster_state
          @state_dict.each_value(&:close)
          @state_dict.clear
          @commands.each { |command| _call(command) }
          break
        rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::NodeMightBeDown
          sleep 1.0
        end
      end
    end
  end
end