1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
# frozen_string_literal: true
module Dalli
##
# Contains logic for the pipelined gets implemented by the client.
##
class PipelinedGetter
# For large batches, interleave sends with response draining to prevent
# socket buffer deadlock. Only kicks in above this threshold.
INTERLEAVE_THRESHOLD = 10_000
# Number of keys to send before draining responses during interleaved mode
CHUNK_SIZE = 10_000
def initialize(ring, key_manager)
@ring = ring
@key_manager = key_manager
end
##
# Yields, one at a time, keys and their values+attributes.
#
def process(keys, &block)
return {} if keys.empty?
@ring.lock do
# Stores partial results collected during interleaved send phase
@partial_results = {}
servers = setup_requests(keys)
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# First yield any partial results collected during interleaved send
yield_partial_results(&block)
servers = fetch_responses(servers, start_time, @ring.socket_timeout, &block) until servers.empty?
end
rescue Dalli::RetryableNetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { 'retrying pipelined gets because of timeout' }
retry
end
private
def yield_partial_results
@partial_results.each_pair do |key, value_list|
yield @key_manager.key_without_namespace(key), value_list
end
@partial_results.clear
end
def setup_requests(keys)
groups = groups_for_keys(keys)
make_getkq_requests(groups)
# TODO: How does this exit on a NetworkError
finish_queries(groups.keys)
end
##
# Loop through the server-grouped sets of keys, writing
# the corresponding getkq requests to the appropriate servers
#
# It's worth noting that we could potentially reduce bytes
# on the wire by switching from getkq to getq, and using
# the opaque value to match requests to responses.
##
def make_getkq_requests(groups)
groups.each do |server, keys_for_server|
if keys_for_server.size <= INTERLEAVE_THRESHOLD
# Small batch - send all at once (existing behavior)
server.request(:pipelined_get, keys_for_server)
else
# Large batch - interleave sends with response draining
# Pass @partial_results directly to avoid hash allocation/merge overhead
server.request(:pipelined_get_interleaved, keys_for_server, CHUNK_SIZE, @partial_results)
end
rescue DalliError, NetworkError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "unable to get keys for server #{server.name}" }
end
end
##
# This loops through the servers that have keys in
# our set, sending the noop to terminate the set of queries.
##
def finish_queries(servers)
deleted = Set.new
servers.each do |server|
next unless server.connected?
begin
finish_query_for_server(server)
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError
deleted << server
end
end
servers.delete_if { |server| deleted.include?(server) }
rescue Dalli::NetworkError
abort_without_timeout(servers)
raise
end
def finish_query_for_server(server)
server.pipeline_response_setup
rescue Dalli::NetworkError
raise
rescue Dalli::DalliError => e
Dalli.logger.debug { e.inspect }
Dalli.logger.debug { "Results from server: #{server.name} will be missing from the results" }
raise
end
# Swallows Dalli::NetworkError
def abort_without_timeout(servers)
servers.each(&:pipeline_abort)
end
def fetch_responses(servers, start_time, timeout, &block)
# Remove any servers which are not connected
servers.select!(&:connected?)
return [] if servers.empty?
time_left = remaining_time(start_time, timeout)
readable_servers = servers_with_response(servers, time_left)
if readable_servers.empty?
abort_with_timeout(servers)
return []
end
# Loop through the servers with responses, and
# delete any from our list that are finished
readable_servers.each do |server|
servers.delete(server) if process_server(server, &block)
end
servers
rescue NetworkError
# Abort and raise if we encountered a network error. This triggers
# a retry at the top level on RetryableNetworkError.
abort_without_timeout(servers)
raise
end
def remaining_time(start, timeout)
elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
return 0 if elapsed > timeout
timeout - elapsed
end
# Swallows Dalli::NetworkError
def abort_with_timeout(servers)
abort_without_timeout(servers)
servers.each do |server|
Dalli.logger.debug { "memcached at #{server.name} did not response within timeout" }
end
true # Required to simplify caller
end
# Processes responses from a server. Returns true if there are no
# additional responses from this server.
def process_server(server)
server.pipeline_next_responses do |key, value, cas|
yield @key_manager.key_without_namespace(key), [value, cas]
end
server.pipeline_complete?
end
def servers_with_response(servers, timeout)
return [] if servers.empty?
sockets = servers.map(&:sock)
readable, = IO.select(sockets, nil, nil, timeout)
return [] if readable.nil?
# For typical server counts (1-5), linear scan is faster than
# building and looking up a hash map
readable.filter_map { |sock| servers.find { |s| s.sock == sock } }
end
def groups_for_keys(*keys)
keys.flatten!
keys.map! { |a| @key_manager.validate_key(a.to_s) }
groups = @ring.keys_grouped_by_server(keys)
if (unfound_keys = groups.delete(nil))
Dalli.logger.debug do
"unable to get keys for #{unfound_keys.length} keys " \
'because no matching server was found'
end
end
groups
end
end
end
|