File: latency_replica.rb

package info (click to toggle)
ruby-redis-cluster-client 0.10.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: ruby: 2,214; makefile: 4
file content (84 lines) | stat: -rw-r--r-- 2,896 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# frozen_string_literal: true

require 'redis_client/cluster/node/base_topology'

class RedisClient
  class Cluster
    class Node
      class LatencyReplica < BaseTopology
        DUMMY_LATENCY_MSEC = 100 * 1000 * 1000
        MEASURE_ATTEMPT_COUNT = 10

        def clients_for_scanning(seed: nil) # rubocop:disable Lint/UnusedMethodArgument
          @clients_for_scanning
        end

        def find_node_key_of_replica(primary_node_key, seed: nil) # rubocop:disable Lint/UnusedMethodArgument
          @replications.fetch(primary_node_key, EMPTY_ARRAY).first || primary_node_key
        end

        def any_replica_node_key(seed: nil)
          random = seed.nil? ? Random : Random.new(seed)
          @existed_replicas.sample(random: random)&.first || any_primary_node_key(seed: seed)
        end

        def process_topology_update!(replications, options)
          super

          all_replica_clients = @clients.select { |k, _| @replica_node_keys.include?(k) }
          latencies = measure_latencies(all_replica_clients, @concurrent_worker)
          @replications.each_value { |keys| keys.sort_by! { |k| latencies.fetch(k) } }
          @replica_clients = select_replica_clients(@replications, @clients)
          @clients_for_scanning = select_clients_for_scanning(@replications, @clients)
          @existed_replicas = @replications.values.reject(&:empty?)
        end

        private

        def measure_latencies(clients, concurrent_worker) # rubocop:disable Metrics/AbcSize
          return {} if clients.empty?

          work_group = concurrent_worker.new_group(size: clients.size)

          clients.each do |node_key, client|
            work_group.push(node_key, client) do |cli|
              min = DUMMY_LATENCY_MSEC
              MEASURE_ATTEMPT_COUNT.times do
                starting = obtain_current_time
                cli.call_once('PING')
                duration = obtain_current_time - starting
                min = duration if duration < min
              end

              min
            rescue StandardError
              DUMMY_LATENCY_MSEC
            end
          end

          latencies = {}
          work_group.each { |node_key, v| latencies[node_key] = v }
          work_group.close
          latencies
        end

        def obtain_current_time
          Process.clock_gettime(Process::CLOCK_MONOTONIC, :microsecond)
        end

        def select_replica_clients(replications, clients)
          keys = replications.values.filter_map(&:first)
          clients.select { |k, _| keys.include?(k) }
        end

        def select_clients_for_scanning(replications, clients)
          keys = replications.map do |primary_node_key, replica_node_keys|
            replica_node_keys.empty? ? primary_node_key : replica_node_keys.first
          end

          clients.select { |k, _| keys.include?(k) }
        end
      end
    end
  end
end