1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
# frozen_string_literal: true
require 'digest/sha1'
require 'zlib'
module Dalli
##
# An implementation of a consistent hash ring, designed to minimize
# the cache miss impact of adding or removing servers from the ring.
# That is, adding or removing a server from the ring should impact
# the key -> server mapping of ~ 1/N of the stored keys where N is the
# number of servers in the ring. This is done by creating a large
# number of "points" per server, distributed over the space
# 0x00000000 - 0xFFFFFFFF. For a given key, we calculate the CRC32
# hash, and find the nearest "point" that is less than or equal to the
# the key's hash. In this implemetation, each "point" is represented
# by a Dalli::Ring::Entry.
##
class Ring
# The number of entries on the continuum created per server
# in an equally weighted scenario.
POINTS_PER_SERVER = 160 # this is the default in libmemcached
attr_accessor :servers, :continuum
def initialize(servers, options)
@servers = servers
@continuum = nil
@continuum = build_continuum(servers) if servers.size > 1
threadsafe! unless options[:threadsafe] == false
@failover = options[:failover] != false
end
def server_for_key(key)
server = if @continuum
server_from_continuum(key)
else
@servers.first
end
# Note that the call to alive? has the side effect of initializing
# the socket
return server if server&.alive?
raise Dalli::RingError, 'No server available'
end
def server_from_continuum(key)
hkey = hash_for(key)
20.times do |try|
server = server_for_hash_key(hkey)
# Note that the call to alive? has the side effect of initializing
# the socket
return server if server.alive?
break unless @failover
hkey = hash_for("#{try}#{key}")
end
nil
end
def keys_grouped_by_server(key_arr)
key_arr.group_by do |key|
server_for_key(key)
rescue Dalli::RingError
Dalli.logger.debug { "unable to get key #{key}" }
nil
end
end
def lock
@servers.each(&:lock!)
begin
yield
ensure
@servers.each(&:unlock!)
end
end
def flush_multi_responses
@servers.each do |s|
s.request(:noop)
rescue Dalli::NetworkError
# Ignore this error, as it indicates the socket is unavailable
# and there's no need to flush
end
end
def socket_timeout
@servers.first.socket_timeout
end
private
def threadsafe!
@servers.each do |s|
s.extend(Dalli::Threadsafe)
end
end
def hash_for(key)
Zlib.crc32(key)
end
def entry_count_for(server, total_servers, total_weight)
((total_servers * POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
end
def server_for_hash_key(hash_key)
# Find the closest index in the Ring with value <= the given value
entryidx = @continuum.bsearch_index { |entry| entry.value > hash_key }
if entryidx.nil?
entryidx = @continuum.size - 1
else
entryidx -= 1
end
@continuum[entryidx].server
end
def build_continuum(servers)
continuum = []
total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
servers.each do |server|
entry_count_for(server, servers.size, total_weight).times do |idx|
hash = Digest::SHA1.hexdigest("#{server.name}:#{idx}")
value = Integer("0x#{hash[0..7]}")
continuum << Dalli::Ring::Entry.new(value, server)
end
end
continuum.sort_by(&:value)
end
##
# Represents a point in the consistent hash ring implementation.
##
class Entry
attr_reader :value, :server
def initialize(val, srv)
@value = val
@server = srv
end
end
end
end
|