File: helper.rb

package info (click to toggle)
ruby-redis 5.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,160 kB
  • sloc: ruby: 11,445; makefile: 117; sh: 24
file content (161 lines) | stat: -rw-r--r-- 3,862 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# frozen_string_literal: true

require_relative "../../test/helper"
$LOAD_PATH.unshift(File.expand_path('../lib', __dir__))

require "redis-clustering"
require_relative 'support/orchestrator'

module Helper
  module Cluster
    include Generic

    DEFAULT_HOST = '127.0.0.1'
    DEFAULT_PORTS = (16_380..16_385).freeze

    ClusterSlotsRawReply = lambda { |host, port|
      # @see https://redis.io/topics/protocol
      <<-REPLY.delete(' ')
        *1\r
        *4\r
        :0\r
        :16383\r
        *3\r
        $#{host.size}\r
        #{host}\r
        :#{port}\r
        $40\r
        649fa246273043021a05f547a79478597d3f1dc5\r
        *3\r
        $#{host.size}\r
        #{host}\r
        :#{port}\r
        $40\r
        649fa246273043021a05f547a79478597d3f1dc5\r
      REPLY
    }

    ClusterNodesRawReply = lambda { |host, port|
      line = "649fa246273043021a05f547a79478597d3f1dc5 #{host}:#{port}@17000 "\
             'myself,master - 0 1530797742000 1 connected 0-16383'
      "$#{line.size}\r\n#{line}\r\n"
    }

    def init(redis)
      redis.flushall
      redis
    rescue Redis::CannotConnectError
      puts <<-MSG

        Cannot connect to Redis Cluster.

        Make sure Redis is running on localhost, port #{DEFAULT_PORTS}.

        Try this once:

          $ make stop_cluster

        Then run the build again:

          $ make

      MSG
      exit! 1
    end

    def build_another_client(options = {})
      _new_client(options)
    end

    def redis_cluster_mock(commands, options = {})
      host = DEFAULT_HOST
      port = nil

      cluster_subcommands = if commands.key?(:cluster)
        commands.delete(:cluster).transform_keys { |k| k.to_s.downcase }
      else
        {}
      end

      commands[:cluster] = lambda { |subcommand, *args|
        subcommand = subcommand.downcase
        if cluster_subcommands.key?(subcommand)
          cluster_subcommands[subcommand].call(*args)
        else
          case subcommand.downcase
          when 'slots' then ClusterSlotsRawReply.call(host, port)
          when 'nodes' then ClusterNodesRawReply.call(host, port)
          else '+OK'
          end
        end
      }

      commands[:command] = ->(*_) { "*0\r\n" }

      RedisMock.start(commands, options) do |po|
        port = po
        scheme = options[:ssl] ? 'rediss' : 'redis'
        nodes = %W[#{scheme}://#{host}:#{port}]
        yield _new_client(options.merge(nodes: nodes))
      end
    end

    def redis_cluster_down
      trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
      trib.down
      yield
    ensure
      trib.rebuild
      trib.close
    end

    def redis_cluster_failover
      trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
      trib.failover
      yield
    ensure
      trib.rebuild
      trib.close
    end

    def redis_cluster_fail_master
      trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
      trib.fail_serving_master
      yield
    ensure
      trib.restart_cluster_nodes
      trib.rebuild
      trib.close
    end

    # @param slot [Integer]
    # @param src [String] <ip>:<port>
    # @param dest [String] <ip>:<port>
    def redis_cluster_resharding(slot, src:, dest:)
      trib = ClusterOrchestrator.new(_default_nodes, timeout: TIMEOUT)
      trib.start_resharding(slot, src, dest)
      yield
      trib.finish_resharding(slot, dest)
    ensure
      trib.rebuild
      trib.close
    end

    private

    def _default_nodes(host: DEFAULT_HOST, ports: DEFAULT_PORTS)
      ports.map { |port| "redis://#{host}:#{port}" }
    end

    def _format_options(options)
      {
        timeout: OPTIONS[:timeout],
        nodes: _default_nodes
      }.merge(options)
    end

    def _new_client(options = {})
      Redis::Cluster.new(_format_options(options).merge(driver: ENV['DRIVER']))
    end
  end
end