1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
|
# frozen_string_literal: true
require 'forwardable'
require 'socket'
require 'timeout'
module Dalli
module Protocol
##
# Access point for a single Memcached server, accessed via Memcached's meta
# protocol. Contains logic for managing connection state to the server (retries, etc),
# formatting requests to the server, and unpacking responses.
##
class Meta < Base
TERMINATOR = "\r\n"
def response_processor
@response_processor ||= ResponseProcessor.new(@connection_manager, @value_marshaller)
end
# NOTE: Additional public methods should be overridden in Dalli::Threadsafe
private
# Retrieval Commands
def get(key, options = nil)
encoded_key, base64 = KeyRegularizer.encode(key)
# Skip bitflags in raw mode - saves 2 bytes per request and skips parsing
skip_flags = raw_mode? || (options && options[:raw])
req = RequestFormatter.meta_get(key: encoded_key, base64: base64, skip_flags: skip_flags)
write(req)
@connection_manager.flush
response_processor.meta_get_with_value(cache_nils: cache_nils?(options))
end
def quiet_get_request(key)
encoded_key, base64 = KeyRegularizer.encode(key)
# Skip bitflags in raw mode - saves 2 bytes per request and skips parsing
RequestFormatter.meta_get(key: encoded_key, return_cas: true, base64: base64, quiet: true,
skip_flags: raw_mode?)
end
def gat(key, ttl, options = nil)
ttl = TtlSanitizer.sanitize(ttl)
encoded_key, base64 = KeyRegularizer.encode(key)
skip_flags = raw_mode? || (options && options[:raw])
req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, base64: base64, skip_flags: skip_flags)
write(req)
@connection_manager.flush
response_processor.meta_get_with_value(cache_nils: cache_nils?(options))
end
def touch(key, ttl)
ttl = TtlSanitizer.sanitize(ttl)
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, value: false, base64: base64)
write(req)
@connection_manager.flush
response_processor.meta_get_without_value
end
# TODO: This is confusing, as there's a cas command in memcached
# and this isn't it. Maybe rename? Maybe eliminate?
def cas(key)
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_get(key: encoded_key, value: true, return_cas: true, base64: base64)
write(req)
@connection_manager.flush
response_processor.meta_get_with_value_and_cas
end
# Comprehensive meta get with support for all metadata flags.
# @note Requires memcached 1.6+ (meta protocol feature)
#
# This is the full-featured get method that supports:
# - Thundering herd protection (vivify_ttl, recache_ttl)
# - Item metadata (hit_status, last_access)
# - LRU control (skip_lru_bump)
#
# @param key [String] the key to retrieve
# @param options [Hash] options controlling what metadata to return
# - :vivify_ttl [Integer] creates a stub on miss with this TTL (N flag)
# - :recache_ttl [Integer] wins recache race if remaining TTL is below this (R flag)
# - :return_hit_status [Boolean] return whether item was previously accessed (h flag)
# - :return_last_access [Boolean] return seconds since last access (l flag)
# - :skip_lru_bump [Boolean] don't bump LRU or update access stats (u flag)
# - :cache_nils [Boolean] whether to cache nil values
# @return [Hash] containing:
# - :value - the cached value (or nil on miss)
# - :cas - the CAS value
# - :won_recache - true if client won recache race (W flag)
# - :stale - true if item is stale (X flag)
# - :lost_recache - true if another client is recaching (Z flag)
# - :hit_before - true/false if previously accessed (only if return_hit_status: true)
# - :last_access - seconds since last access (only if return_last_access: true)
def meta_get(key, options = {})
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_get(
key: encoded_key, value: true, return_cas: true, base64: base64,
vivify_ttl: options[:vivify_ttl], recache_ttl: options[:recache_ttl],
return_hit_status: options[:return_hit_status],
return_last_access: options[:return_last_access], skip_lru_bump: options[:skip_lru_bump]
)
write(req)
@connection_manager.flush
response_processor.meta_get_with_metadata(
cache_nils: cache_nils?(options), return_hit_status: options[:return_hit_status],
return_last_access: options[:return_last_access]
)
end
# Delete with stale invalidation instead of actual deletion.
# Used with thundering herd protection to mark items as stale rather than removing them.
# @note Requires memcached 1.6+ (meta protocol feature)
#
# @param key [String] the key to invalidate
# @param cas [Integer] optional CAS value for compare-and-swap
# @return [Boolean] true if successful
def delete_stale(key, cas = nil)
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_delete(key: encoded_key, cas: cas, base64: base64, stale: true)
write(req)
@connection_manager.flush
response_processor.meta_delete
end
# Storage Commands
def set(key, value, ttl, cas, options)
write_storage_req(:set, key, value, ttl, cas, options)
response_processor.meta_set_with_cas unless quiet?
end
# Pipelined set - writes a quiet set request without reading response.
# Used by PipelinedSetter for bulk operations.
def pipelined_set(key, value, ttl, options)
write_storage_req(:set, key, value, ttl, nil, options, quiet: true)
end
def add(key, value, ttl, options)
write_storage_req(:add, key, value, ttl, nil, options)
response_processor.meta_set_with_cas unless quiet?
end
def replace(key, value, ttl, cas, options)
write_storage_req(:replace, key, value, ttl, cas, options)
response_processor.meta_set_with_cas unless quiet?
end
# rubocop:disable Metrics/ParameterLists
def write_storage_req(mode, key, raw_value, ttl = nil, cas = nil, options = {}, quiet: quiet?)
(value, bitflags) = @value_marshaller.store(key, raw_value, options)
ttl = TtlSanitizer.sanitize(ttl) if ttl
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_set(key: encoded_key, value: value,
bitflags: bitflags, cas: cas,
ttl: ttl, mode: mode, quiet: quiet, base64: base64)
write(req)
write(value)
write(TERMINATOR)
@connection_manager.flush unless quiet
end
# rubocop:enable Metrics/ParameterLists
def append(key, value)
write_append_prepend_req(:append, key, value)
response_processor.meta_set_append_prepend unless quiet?
end
def prepend(key, value)
write_append_prepend_req(:prepend, key, value)
response_processor.meta_set_append_prepend unless quiet?
end
# rubocop:disable Metrics/ParameterLists
def write_append_prepend_req(mode, key, value, ttl = nil, cas = nil, _options = {})
ttl = TtlSanitizer.sanitize(ttl) if ttl
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_set(key: encoded_key, value: value, base64: base64,
cas: cas, ttl: ttl, mode: mode, quiet: quiet?)
write(req)
write(value)
write(TERMINATOR)
@connection_manager.flush unless quiet?
end
# rubocop:enable Metrics/ParameterLists
# Delete Commands
def delete(key, cas)
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_delete(key: encoded_key, cas: cas,
base64: base64, quiet: quiet?)
write(req)
@connection_manager.flush unless quiet?
response_processor.meta_delete unless quiet?
end
# Pipelined delete - writes a quiet delete request without reading response.
# Used by PipelinedDeleter for bulk operations.
def pipelined_delete(key)
encoded_key, base64 = KeyRegularizer.encode(key)
req = RequestFormatter.meta_delete(key: encoded_key, base64: base64, quiet: true)
write(req)
end
# Arithmetic Commands
def decr(key, count, ttl, initial)
decr_incr false, key, count, ttl, initial
end
def incr(key, count, ttl, initial)
decr_incr true, key, count, ttl, initial
end
def decr_incr(incr, key, delta, ttl, initial)
ttl = initial ? TtlSanitizer.sanitize(ttl) : nil # Only set a TTL if we want to set a value on miss
encoded_key, base64 = KeyRegularizer.encode(key)
write(RequestFormatter.meta_arithmetic(key: encoded_key, delta: delta, initial: initial, incr: incr, ttl: ttl,
quiet: quiet?, base64: base64))
@connection_manager.flush unless quiet?
response_processor.decr_incr unless quiet?
end
# Other Commands
def flush(delay = 0)
write(RequestFormatter.flush(delay: delay))
@connection_manager.flush unless quiet?
response_processor.flush unless quiet?
end
# Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands.
# We need to read all the responses at once.
def noop
write_noop
response_processor.consume_all_responses_until_mn
end
def stats(info = nil)
write(RequestFormatter.stats(info))
@connection_manager.flush
response_processor.stats
end
def reset_stats
write(RequestFormatter.stats('reset'))
@connection_manager.flush
response_processor.reset
end
def version
write(RequestFormatter.version)
@connection_manager.flush
response_processor.version
end
def write_noop
write(RequestFormatter.meta_noop)
@connection_manager.flush
end
# Single-server fast path for get_multi. Inlines request formatting and
# response parsing to minimize per-key overhead. Avoids the PipelinedGetter
# machinery (IO.select, response buffering, server grouping).
def read_multi_req(keys)
is_raw = raw_mode?
# Inline request formatting — avoids RequestFormatter.meta_get overhead per key.
# In raw mode: "mg <key> v k q s\r\n" (no f flag, key at index 2)
# Normal mode: "mg <key> v f k q s\r\n" (key at index 3)
post_get = is_raw ? " v k q s\r\n" : " v f k q s\r\n"
keys.each do |key|
encoded_key, base64 = KeyRegularizer.encode(key)
write(base64 ? "mg #{encoded_key} b#{post_get}" : "mg #{encoded_key}#{post_get}")
end
write("mn\r\n")
@connection_manager.flush
read_multi_get_responses(is_raw)
end
def read_multi_get_responses(is_raw)
hash = {}
key_index = is_raw ? 2 : 3
while (line = @connection_manager.read_line)
break if line.start_with?('MN')
next unless line.start_with?('VA ')
key, value = parse_multi_get_value(line, key_index, is_raw)
hash[key] = value if key
end
hash
end
def parse_multi_get_value(line, key_index, is_raw)
tokens = line.chomp!(TERMINATOR).split
value = @connection_manager.read(tokens[1].to_i + TERMINATOR.bytesize)&.chomp!(TERMINATOR)
raw_key = tokens[key_index]
return unless raw_key
key = KeyRegularizer.decode(raw_key[1..], tokens.include?('b'))
bitflags = is_raw ? 0 : response_processor.bitflags_from_tokens(tokens)
[key, @value_marshaller.retrieve(value, bitflags)]
end
# Single-server fast path for set_multi. Inlines request formatting to
# minimize per-key overhead. Avoids PipelinedSetter server grouping.
def write_multi_req(pairs, ttl, req_options)
ttl = TtlSanitizer.sanitize(ttl) if ttl
pairs.each do |key, raw_value|
(value, bitflags) = @value_marshaller.store(key, raw_value, req_options)
encoded_key, base64 = KeyRegularizer.encode(key)
# Inline format: "ms <key> <size> c [b] F<flags> T<ttl> MS q\r\n"
cmd = "ms #{encoded_key} #{value.bytesize} c"
cmd << ' b' if base64
cmd << " F#{bitflags}" if bitflags
cmd << " T#{ttl}" if ttl
cmd << " MS q\r\n"
write(cmd)
write(value)
write(TERMINATOR)
end
write_noop
response_processor.consume_all_responses_until_mn
end
# Single-server fast path for delete_multi. Writes all quiet delete requests
# terminated by a noop, then consumes all responses.
def delete_multi_req(keys)
keys.each do |key|
encoded_key, base64 = KeyRegularizer.encode(key)
# Inline format: "md <key> [b] q\r\n"
write(base64 ? "md #{encoded_key} b q\r\n" : "md #{encoded_key} q\r\n")
end
write_noop
response_processor.consume_all_responses_until_mn
end
require_relative 'key_regularizer'
require_relative 'request_formatter'
require_relative 'response_processor'
end
end
end
|