1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
# frozen_string_literal: true
require_relative "../../test/helper"
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))
require "redis-clustering"
require_relative 'support/orchestrator'
module Helper
module Cluster
include Generic
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORTS = (16_380..16_385).freeze
ClusterSlotsRawReply = lambda { |host, port|
# @see https://redis.io/topics/protocol
<<-REPLY.delete(' ')
*1\r
*4\r
:0\r
:16383\r
*3\r
$#{host.size}\r
#{host}\r
:#{port}\r
$40\r
649fa246273043021a05f547a79478597d3f1dc5\r
*3\r
$#{host.size}\r
#{host}\r
:#{port}\r
$40\r
649fa246273043021a05f547a79478597d3f1dc5\r
REPLY
}
ClusterNodesRawReply = lambda { |host, port|
line = "649fa246273043021a05f547a79478597d3f1dc5 #{host}:#{port}@17000 "\
'myself,master - 0 1530797742000 1 connected 0-16383'
"$#{line.size}\r\n#{line}\r\n"
}
def init(redis)
redis.flushall
redis
rescue Redis::CannotConnectError
puts <<-MSG
Cannot connect to Redis Cluster.
Make sure Redis is running on localhost, port #{DEFAULT_PORTS}.
Try this once:
$ make stop_cluster
Then run the build again:
$ make
MSG
exit! 1
end
def build_another_client(options = {})
_new_client(options)
end
def redis_cluster_mock(commands, options = {})
host = DEFAULT_HOST
port = nil
cluster_subcommands = if commands.key?(:cluster)
commands.delete(:cluster).transform_keys { |k| k.to_s.downcase }
else
{}
end
commands[:cluster] = lambda { |subcommand, *args|
subcommand = subcommand.downcase
if cluster_subcommands.key?(subcommand)
cluster_subcommands[subcommand].call(*args)
else
case subcommand.downcase
when 'slots' then ClusterSlotsRawReply.call(host, port)
when 'nodes' then ClusterNodesRawReply.call(host, port)
else '+OK'
end
end
}
commands[:command] = ->(*_) { "*0\r\n" }
RedisMock.start(commands, options) do |po|
port = po
scheme = options[:ssl] ? 'rediss' : 'redis'
nodes = %W[#{scheme}://#{host}:#{port}]
yield _new_client(options.merge(nodes: nodes))
end
end
def redis_cluster_down
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.down
yield
ensure
trib.rebuild
trib.close
end
def redis_cluster_failover
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.failover
yield
ensure
trib.rebuild
trib.close
end
def redis_cluster_fail_master
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.fail_serving_master
yield
ensure
trib.restart_cluster_nodes
trib.rebuild
trib.close
end
# @param slot [Integer]
# @param src [String] <ip>:<port>
# @param dest [String] <ip>:<port>
def redis_cluster_resharding(slot, src:, dest:)
trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
trib.start_resharding(slot, src, dest)
yield
trib.finish_resharding(slot, dest)
ensure
trib.rebuild
trib.close
end
private
def _default_nodes(host: DEFAULT_HOST, ports: DEFAULT_PORTS)
ports.map { |port| "redis://#{host}:#{port}" }
end
def _format_options(options)
{
timeout: OPTIONS[:timeout],
nodes: _default_nodes
}.merge(options)
end
def _new_client(options = {})
Redis::Cluster.new(_format_options(options).merge(driver: ENV['DRIVER']))
end
end
end
|