File: cluster_config.rb

package info (click to toggle)
ruby-redis-cluster-client 0.13.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 224 kB
  • sloc: ruby: 2,498; makefile: 4
file content (208 lines) | stat: -rw-r--r-- 7,369 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# frozen_string_literal: true

require 'uri'
require 'redis_client'
require 'redis_client/cluster'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/node_key'
require 'redis_client/cluster/noop_command_builder'
require 'redis_client/command_builder'

class RedisClient
  class ClusterConfig
    DEFAULT_HOST = '127.0.0.1'
    DEFAULT_PORT = 6379
    DEFAULT_SCHEME = 'redis'
    SECURE_SCHEME = 'rediss'
    DEFAULT_NODE = "#{DEFAULT_SCHEME}://#{DEFAULT_HOST}:#{DEFAULT_PORT}"
    Ractor.make_shareable(DEFAULT_NODE) if Object.const_defined?(:Ractor, false) && Ractor.respond_to?(:make_shareable)
    DEFAULT_NODES = [DEFAULT_NODE].freeze
    VALID_SCHEMES = [DEFAULT_SCHEME, SECURE_SCHEME].freeze
    VALID_NODES_KEYS = %i[ssl username password host port db].freeze
    MERGE_CONFIG_KEYS = %i[ssl username password].freeze
    IGNORE_GENERIC_CONFIG_KEYS = %i[url host port path].freeze
    MAX_WORKERS = Integer(ENV.fetch('REDIS_CLIENT_MAX_THREADS', -1)) # for backward compatibility
    # It's used with slow queries of fetching meta data like CLUSTER NODES, COMMAND and so on.
    SLOW_COMMAND_TIMEOUT = Float(ENV.fetch('REDIS_CLIENT_SLOW_COMMAND_TIMEOUT', -1))
    # It affects to strike a balance between load and stability in initialization or changed states.
    MAX_STARTUP_SAMPLE = Integer(ENV.fetch('REDIS_CLIENT_MAX_STARTUP_SAMPLE', 3))

    private_constant :DEFAULT_HOST, :DEFAULT_PORT, :DEFAULT_SCHEME, :SECURE_SCHEME, :DEFAULT_NODES,
                     :VALID_SCHEMES, :VALID_NODES_KEYS, :MERGE_CONFIG_KEYS, :IGNORE_GENERIC_CONFIG_KEYS,
                     :MAX_WORKERS, :SLOW_COMMAND_TIMEOUT, :MAX_STARTUP_SAMPLE

    InvalidClientConfigError = Class.new(::RedisClient::Cluster::Error)

    attr_reader :command_builder, :client_config, :replica_affinity, :slow_command_timeout,
                :connect_with_original_config, :startup_nodes, :max_startup_sample, :id

    def initialize( # rubocop:disable Metrics/ParameterLists
      nodes: DEFAULT_NODES,
      replica: false,
      replica_affinity: :random,
      fixed_hostname: '',
      concurrency: nil,
      connect_with_original_config: false,
      client_implementation: ::RedisClient::Cluster, # for redis gem
      slow_command_timeout: SLOW_COMMAND_TIMEOUT,
      command_builder: ::RedisClient::CommandBuilder,
      max_startup_sample: MAX_STARTUP_SAMPLE,
      **client_config
    )
      @replica = true & replica
      @replica_affinity = replica_affinity.to_s.to_sym
      @fixed_hostname = fixed_hostname.to_s
      @command_builder = command_builder
      node_configs = build_node_configs(nodes.dup)
      @client_config = merge_generic_config(client_config, node_configs)
      # Keep tabs on the original startup nodes we were constructed with
      @startup_nodes = build_startup_nodes(node_configs)
      @concurrency = merge_concurrency_option(concurrency)
      @connect_with_original_config = connect_with_original_config
      @client_implementation = client_implementation
      @slow_command_timeout = slow_command_timeout
      @max_startup_sample = max_startup_sample
      @id = client_config[:id]
    end

    def inspect
      "#<#{self.class.name} #{startup_nodes.values.map { |v| v.reject { |k| k == :command_builder } }}>"
    end

    def connect_timeout
      @client_config[:connect_timeout] || @client_config[:timeout] || ::RedisClient::Config::DEFAULT_TIMEOUT
    end

    def read_timeout
      @client_config[:read_timeout] || @client_config[:timeout] || ::RedisClient::Config::DEFAULT_TIMEOUT
    end

    def write_timeout
      @client_config[:write_timeout] || @client_config[:timeout] || ::RedisClient::Config::DEFAULT_TIMEOUT
    end

    def new_pool(size: 5, timeout: 5, **kwargs)
      @client_implementation.new(
        self,
        pool: { size: size, timeout: timeout },
        concurrency: @concurrency,
        **kwargs
      )
    end

    def new_client(**kwargs)
      @client_implementation.new(self, concurrency: @concurrency, **kwargs)
    end

    def use_replica?
      @replica
    end

    def client_config_for_node(node_key)
      config = ::RedisClient::Cluster::NodeKey.hashify(node_key)
      config[:port] = ensure_integer(config[:port])
      augment_client_config(config)
    end

    def resolved?
      true
    end

    def sentinel?
      false
    end

    def server_url
      nil
    end

    private

    def merge_concurrency_option(option)
      opts = {}

      if MAX_WORKERS.positive?
        opts[:model] = :on_demand
        opts[:size] = MAX_WORKERS
      end

      opts.merge!(option.transform_keys(&:to_sym)) if option.is_a?(Hash)
      opts[:model] = :none if opts.empty?
      opts.freeze
    end

    def build_node_configs(addrs)
      configs = Array[addrs].flatten.filter_map { |addr| parse_node_addr(addr) }
      raise InvalidClientConfigError, '`nodes` option is empty' if configs.empty?

      configs
    end

    def parse_node_addr(addr)
      case addr
      when String
        parse_node_url(addr)
      when Hash
        parse_node_option(addr)
      else
        raise InvalidClientConfigError, "`nodes` option includes invalid type values: #{addr}"
      end
    end

    def parse_node_url(addr) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
      return if addr.empty?

      uri = URI(addr)
      scheme = uri.scheme || DEFAULT_SCHEME
      raise InvalidClientConfigError, "`nodes` option includes a invalid uri scheme: #{addr}" unless VALID_SCHEMES.include?(scheme)

      username = uri.user ? URI.decode_www_form_component(uri.user) : nil
      password = uri.password ? URI.decode_www_form_component(uri.password) : nil
      host = uri.host || DEFAULT_HOST
      port = uri.port || DEFAULT_PORT
      db = uri.path.index('/').nil? ? uri.path : uri.path.split('/')[1]
      db = db.nil? || db.empty? ? db : ensure_integer(db)

      { ssl: scheme == SECURE_SCHEME, username: username, password: password, host: host, port: port, db: db }
        .reject { |_, v| v.nil? || v == '' || v == false }
    rescue URI::InvalidURIError => e
      raise InvalidClientConfigError, "#{e.message}: #{addr}"
    end

    def parse_node_option(addr)
      return if addr.empty?

      addr = addr.transform_keys(&:to_sym)
      addr[:host] ||= DEFAULT_HOST
      addr[:port] = ensure_integer(addr[:port] || DEFAULT_PORT)
      addr.select { |k, _| VALID_NODES_KEYS.include?(k) }
    end

    def ensure_integer(value)
      Integer(value)
    rescue ArgumentError => e
      raise InvalidClientConfigError, e.message
    end

    def merge_generic_config(client_config, node_configs)
      cfg = node_configs.first || {}
      client_config.reject { |k, _| IGNORE_GENERIC_CONFIG_KEYS.include?(k) }
                   .merge(cfg.slice(*MERGE_CONFIG_KEYS))
    end

    def build_startup_nodes(configs)
      configs.to_h do |config|
        node_key = ::RedisClient::Cluster::NodeKey.build_from_host_port(config[:host], config[:port])
        config = augment_client_config(config)
        [node_key, config]
      end
    end

    def augment_client_config(config)
      config = @client_config.merge(config)
      config = config.merge(host: @fixed_hostname) unless @fixed_hostname.empty?
      config[:command_builder] = ::RedisClient::Cluster::NoopCommandBuilder # prevent twice call
      config
    end
  end
end