1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
# frozen_string_literal: true
require 'redis_client'
require 'redis_client/cluster/errors'
class RedisClient
class Cluster
class PubSub
class State
IO_ERROR_NEVER = { IOError => :never }.freeze
IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze
private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE
def initialize(client, queue)
@client = client
@worker = nil
@queue = queue
end
def call(command)
@client.call_v(command)
end
def ensure_worker
@worker = spawn_worker(@client, @queue) unless @worker&.alive?
end
def close
if @worker&.alive?
@worker.exit
@worker.join
end
@client.close
rescue ::RedisClient::ConnectionError
# ignore
end
private
def spawn_worker(client, queue)
# Ruby VM allocates 1 MB memory as a stack for a thread.
# It is a fixed size but we can modify the size with some environment variables.
# So it consumes memory 1 MB multiplied a number of workers.
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
Thread.handle_interrupt(IO_ERROR_NEVER) do
loop do
Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << pubsub.next_event }
prev_err = nil
rescue StandardError => e
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message
Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << e }
prev_err = e
end
end
rescue IOError
# stream closed in another thread
end
end
end
BUF_SIZE = Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
private_constant :BUF_SIZE
def initialize(router, command_builder)
@router = router
@command_builder = command_builder
@queue = SizedQueue.new(BUF_SIZE)
@state_dict = {}
@commands = []
end
def call(*args, **kwargs)
command = @command_builder.generate(args, kwargs)
_call(command)
@commands << command
nil
end
def call_v(command)
command = @command_builder.generate(command)
_call(command)
@commands << command
nil
end
def close
@state_dict.each_value(&:close)
@state_dict.clear
@commands.clear
@queue.clear
@queue.close
nil
end
def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
@state_dict.each_value(&:ensure_worker)
max_duration = calc_max_duration(timeout)
starting = obtain_current_time
loop do
break if max_duration > 0 && obtain_current_time - starting > max_duration
case event = @queue.pop(true)
when ::RedisClient::CommandError
raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN')
break start_over
when ::RedisClient::ConnectionError then break start_over
when StandardError then raise event
when Array then break event
end
rescue ThreadError
sleep 0.005
end
end
private
def _call(command) # rubocop:disable Metrics/AbcSize
if command.first.casecmp('subscribe').zero?
call_to_single_state(command)
elsif command.first.casecmp('psubscribe').zero?
call_to_single_state(command)
elsif command.first.casecmp('ssubscribe').zero?
call_to_single_state(command)
elsif command.first.casecmp('unsubscribe').zero?
call_to_all_states(command)
elsif command.first.casecmp('punsubscribe').zero?
call_to_all_states(command)
elsif command.first.casecmp('sunsubscribe').zero?
call_for_sharded_states(command)
else
call_to_single_state(command)
end
end
def call_to_single_state(command)
node_key = @router.find_node_key(command)
handle_connection_error(node_key) do
@state_dict[node_key] ||= State.new(@router.find_node(node_key).pubsub, @queue)
@state_dict[node_key].call(command)
end
end
def call_to_all_states(command)
@state_dict.each do |node_key, state|
handle_connection_error(node_key, ignore: true) do
state.call(command)
end
end
end
def call_for_sharded_states(command)
if command.size == 1
call_to_all_states(command)
else
call_to_single_state(command)
end
end
def obtain_current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
end
def calc_max_duration(timeout)
timeout.nil? || timeout < 0 ? 0 : timeout * 1_000_000
end
def handle_connection_error(node_key, ignore: false)
yield
rescue ::RedisClient::ConnectionError
@state_dict[node_key]&.close
@state_dict.delete(node_key)
@router.renew_cluster_state
raise unless ignore
end
def start_over
loop do
@router.renew_cluster_state
@state_dict.each_value(&:close)
@state_dict.clear
@commands.each { |command| _call(command) }
break
rescue ::RedisClient::ConnectionError, ::RedisClient::Cluster::NodeMightBeDown
sleep 1.0
end
end
end
end
end
|