1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
# frozen_string_literal: true
require 'forwardable'
require 'socket'
require 'timeout'
module Dalli
module Protocol
##
# Access point for a single Memcached server, accessed via Memcached's binary
# protocol. Contains logic for managing connection state to the server (retries, etc),
# formatting requests to the server, and unpacking responses.
##
class Binary < Base
def response_processor
@response_processor ||= ResponseProcessor.new(@connection_manager, @value_marshaller)
end
private
# Retrieval Commands
def get(key, options = nil)
req = RequestFormatter.standard_request(opkey: :get, key: key)
write(req)
response_processor.get(cache_nils: cache_nils?(options))
end
def quiet_get_request(key)
RequestFormatter.standard_request(opkey: :getkq, key: key)
end
def gat(key, ttl, options = nil)
ttl = TtlSanitizer.sanitize(ttl)
req = RequestFormatter.standard_request(opkey: :gat, key: key, ttl: ttl)
write(req)
response_processor.get(cache_nils: cache_nils?(options))
end
def touch(key, ttl)
ttl = TtlSanitizer.sanitize(ttl)
write(RequestFormatter.standard_request(opkey: :touch, key: key, ttl: ttl))
response_processor.generic_response
end
# TODO: This is confusing, as there's a cas command in memcached
# and this isn't it. Maybe rename? Maybe eliminate?
def cas(key)
req = RequestFormatter.standard_request(opkey: :get, key: key)
write(req)
response_processor.data_cas_response
end
# Storage Commands
def set(key, value, ttl, cas, options)
opkey = quiet? ? :setq : :set
storage_req(opkey, key, value, ttl, cas, options)
end
def add(key, value, ttl, options)
opkey = quiet? ? :addq : :add
storage_req(opkey, key, value, ttl, 0, options)
end
def replace(key, value, ttl, cas, options)
opkey = quiet? ? :replaceq : :replace
storage_req(opkey, key, value, ttl, cas, options)
end
# rubocop:disable Metrics/ParameterLists
def storage_req(opkey, key, value, ttl, cas, options)
(value, bitflags) = @value_marshaller.store(key, value, options)
ttl = TtlSanitizer.sanitize(ttl)
req = RequestFormatter.standard_request(opkey: opkey, key: key,
value: value, bitflags: bitflags,
ttl: ttl, cas: cas)
write(req)
response_processor.storage_response unless quiet?
end
# rubocop:enable Metrics/ParameterLists
def append(key, value)
opkey = quiet? ? :appendq : :append
write_append_prepend opkey, key, value
end
def prepend(key, value)
opkey = quiet? ? :prependq : :prepend
write_append_prepend opkey, key, value
end
def write_append_prepend(opkey, key, value)
write(RequestFormatter.standard_request(opkey: opkey, key: key, value: value))
response_processor.no_body_response unless quiet?
end
# Delete Commands
def delete(key, cas)
opkey = quiet? ? :deleteq : :delete
req = RequestFormatter.standard_request(opkey: opkey, key: key, cas: cas)
write(req)
response_processor.delete unless quiet?
end
# Arithmetic Commands
def decr(key, count, ttl, initial)
opkey = quiet? ? :decrq : :decr
decr_incr opkey, key, count, ttl, initial
end
def incr(key, count, ttl, initial)
opkey = quiet? ? :incrq : :incr
decr_incr opkey, key, count, ttl, initial
end
# This allows us to special case a nil initial value, and
# handle it differently than a zero. This special value
# for expiry causes memcached to return a not found
# if the key doesn't already exist, rather than
# setting the initial value
NOT_FOUND_EXPIRY = 0xFFFFFFFF
def decr_incr(opkey, key, count, ttl, initial)
expiry = initial ? TtlSanitizer.sanitize(ttl) : NOT_FOUND_EXPIRY
initial ||= 0
write(RequestFormatter.decr_incr_request(opkey: opkey, key: key,
count: count, initial: initial, expiry: expiry))
response_processor.decr_incr unless quiet?
end
# Other Commands
def flush(ttl = 0)
opkey = quiet? ? :flushq : :flush
write(RequestFormatter.standard_request(opkey: opkey, ttl: ttl))
response_processor.no_body_response unless quiet?
end
# Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands.
# We need to read all the responses at once.
def noop
write_noop
response_processor.consume_all_responses_until_noop
end
def stats(info = '')
req = RequestFormatter.standard_request(opkey: :stat, key: info)
write(req)
response_processor.stats
end
def reset_stats
write(RequestFormatter.standard_request(opkey: :stat, key: 'reset'))
response_processor.reset
end
def version
write(RequestFormatter.standard_request(opkey: :version))
response_processor.version
end
def write_noop
req = RequestFormatter.standard_request(opkey: :noop)
write(req)
end
require_relative 'binary/request_formatter'
require_relative 'binary/response_header'
require_relative 'binary/response_processor'
require_relative 'binary/sasl_authentication'
include SaslAuthentication
end
end
end
|