File: command.rb

package info (click to toggle)
ruby-redis-cluster-client 0.13.6-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 224 kB
  • sloc: ruby: 2,498; makefile: 4
file content (130 lines) | stat: -rw-r--r-- 3,979 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# frozen_string_literal: true

require 'redis_client'
require 'redis_client/cluster/errors'
require 'redis_client/cluster/key_slot_converter'

class RedisClient
  class Cluster
    class Command
      EMPTY_STRING = ''
      EMPTY_HASH = {}.freeze
      EMPTY_ARRAY = [].freeze

      private_constant :EMPTY_STRING, :EMPTY_HASH, :EMPTY_ARRAY

      Detail = Struct.new(
        'RedisCommand',
        :first_key_position,
        :key_step,
        :write?,
        :readonly?,
        keyword_init: true
      )

      class << self
        def load(nodes, slow_command_timeout: -1) # rubocop:disable Metrics/AbcSize
          cmd = errors = nil

          nodes&.each do |node|
            regular_timeout = node.read_timeout
            node.read_timeout = slow_command_timeout > 0.0 ? slow_command_timeout : regular_timeout
            reply = node.call('command')
            node.read_timeout = regular_timeout
            commands = parse_command_reply(reply)
            cmd = ::RedisClient::Cluster::Command.new(commands)
            break
          rescue ::RedisClient::Error => e
            errors ||= []
            errors << e
          end

          return cmd unless cmd.nil?

          raise ::RedisClient::Cluster::InitialSetupError.from_errors(errors)
        end

        private

        def parse_command_reply(rows) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
          rows&.each_with_object({}) do |row, acc|
            next if row.first.nil?

            # TODO: in redis 7.0 or later, subcommand information included in the command reply

            pos = case row.first
                  when 'eval', 'evalsha', 'zinterstore', 'zunionstore' then 3
                  when 'object', 'xgroup' then 2
                  when 'migrate', 'xread', 'xreadgroup' then 0
                  else row[3]
                  end

            writable = case row.first
                       when 'xgroup' then true
                       else row[2].include?('write')
                       end

            acc[row.first] = ::RedisClient::Cluster::Command::Detail.new(
              first_key_position: pos,
              key_step: row[5],
              write?: writable,
              readonly?: row[2].include?('readonly')
            )
          end.freeze || EMPTY_HASH
        end
      end

      def initialize(commands)
        @commands = commands || EMPTY_HASH
      end

      def extract_first_key(command)
        i = determine_first_key_position(command)
        return EMPTY_STRING if i == 0

        command[i]
      end

      def should_send_to_primary?(command)
        find_command_info(command.first)&.write?
      end

      def should_send_to_replica?(command)
        find_command_info(command.first)&.readonly?
      end

      def exists?(name)
        @commands.key?(name) || @commands.key?(name.to_s.downcase(:ascii))
      end

      private

      def find_command_info(name)
        @commands[name] || @commands[name.to_s.downcase(:ascii)]
      end

      def determine_first_key_position(command) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/PerceivedComplexity
        i = find_command_info(command.first)&.first_key_position.to_i
        return i if i > 0

        cmd_name = command.first
        if cmd_name.casecmp('xread').zero?
          determine_optional_key_position(command, 'streams')
        elsif cmd_name.casecmp('xreadgroup').zero?
          determine_optional_key_position(command, 'streams')
        elsif cmd_name.casecmp('migrate').zero?
          command[3].empty? ? determine_optional_key_position(command, 'keys') : 3
        elsif cmd_name.casecmp('memory').zero?
          command[1].to_s.casecmp('usage').zero? ? 2 : 0
        else
          i
        end
      end

      def determine_optional_key_position(command, option_name)
        i = command.index { |v| v.to_s.casecmp(option_name).zero? }
        i.nil? ? 0 : i + 1
      end
    end
  end
end