File: ring.rb

package info (click to toggle)
ruby-dalli 3.0.6-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 544 kB
  • sloc: ruby: 4,965; sh: 11; makefile: 6
file content (147 lines) | stat: -rw-r--r-- 3,993 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# frozen_string_literal: true

require 'digest/sha1'
require 'zlib'

module Dalli
  ##
  # An implementation of a consistent hash ring, designed to minimize
  # the cache miss impact of adding or removing servers from the ring.
  # That is, adding or removing a server from the ring should impact
  # the key -> server mapping of ~ 1/N of the stored keys where N is the
  # number of servers in the ring.  This is done by creating a large
  # number of "points" per server, distributed over the space
  # 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32
  # hash, and find the nearest "point" that is less than or equal to the
  # the key's hash.  In this implemetation, each "point" is represented
  # by a Dalli::Ring::Entry.
  ##
  class Ring
    # The number of entries on the continuum created per server
    # in an equally weighted scenario.
    POINTS_PER_SERVER = 160 # this is the default in libmemcached

    attr_accessor :servers, :continuum

    def initialize(servers, options)
      @servers = servers
      @continuum = nil
      @continuum = build_continuum(servers) if servers.size > 1

      threadsafe! unless options[:threadsafe] == false
      @failover = options[:failover] != false
    end

    def server_for_key(key)
      server = if @continuum
                 server_from_continuum(key)
               else
                 @servers.first
               end

      # Note that the call to alive? has the side effect of initializing
      # the socket
      return server if server&.alive?

      raise Dalli::RingError, 'No server available'
    end

    def server_from_continuum(key)
      hkey = hash_for(key)
      20.times do |try|
        server = server_for_hash_key(hkey)

        # Note that the call to alive? has the side effect of initializing
        # the socket
        return server if server.alive?
        break unless @failover

        hkey = hash_for("#{try}#{key}")
      end
      nil
    end

    def keys_grouped_by_server(key_arr)
      key_arr.group_by do |key|
        server_for_key(key)
      rescue Dalli::RingError
        Dalli.logger.debug { "unable to get key #{key}" }
        nil
      end
    end

    def lock
      @servers.each(&:lock!)
      begin
        yield
      ensure
        @servers.each(&:unlock!)
      end
    end

    def flush_multi_responses
      @servers.each do |s|
        s.request(:noop)
      rescue Dalli::NetworkError
        # Ignore this error, as it indicates the socket is unavailable
        # and there's no need to flush
      end
    end

    def socket_timeout
      @servers.first.socket_timeout
    end

    private

    def threadsafe!
      @servers.each do |s|
        s.extend(Dalli::Threadsafe)
      end
    end

    def hash_for(key)
      Zlib.crc32(key)
    end

    def entry_count_for(server, total_servers, total_weight)
      ((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
    end

    def server_for_hash_key(hash_key)
      # Find the closest index in the Ring with value <= the given value
      entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key }
      if entryidx.nil?
        entryidx = @continuum.size - 1
      else
        entryidx -= 1
      end
      @continuum[entryidx].server
    end

    def build_continuum(servers)
      continuum = []
      total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
      servers.each do |server|
        entry_count_for(server, servers.size, total_weight).times do |idx|
          hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}")
          value = Integer("0x#{hash[0..7]}")
          continuum << Dalli::Ring::Entry.new(value, server)
        end
      end
      continuum.sort_by(&:value)
    end

    ##
    # Represents a point in the consistent hash ring implementation.
    ##
    class Entry
      attr_reader :value, :server

      def initialize(val, srv)
        @value = val
        @server = srv
      end
    end
  end
end