File: meta.rb

package info (click to toggle)
ruby-dalli 5.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 992 kB
  • sloc: ruby: 9,447; sh: 19; makefile: 4
file content (341 lines) | stat: -rw-r--r-- 13,848 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# frozen_string_literal: true

require 'forwardable'
require 'socket'
require 'timeout'

module Dalli
  module Protocol
    ##
    # Access point for a single Memcached server, accessed via Memcached's meta
    # protocol.  Contains logic for managing connection state to the server (retries, etc),
    # formatting requests to the server, and unpacking responses.
    ##
    class Meta < Base
      TERMINATOR = "\r\n"

      def response_processor
        @response_processor ||= ResponseProcessor.new(@connection_manager, @value_marshaller)
      end

      # NOTE: Additional public methods should be overridden in Dalli::Threadsafe

      private

      # Retrieval Commands
      def get(key, options = nil)
        encoded_key, base64 = KeyRegularizer.encode(key)
        # Skip bitflags in raw mode - saves 2 bytes per request and skips parsing
        skip_flags = raw_mode? || (options && options[:raw])
        req = RequestFormatter.meta_get(key: encoded_key, base64: base64, skip_flags: skip_flags)
        write(req)
        @connection_manager.flush
        response_processor.meta_get_with_value(cache_nils: cache_nils?(options))
      end

      def quiet_get_request(key)
        encoded_key, base64 = KeyRegularizer.encode(key)
        # Skip bitflags in raw mode - saves 2 bytes per request and skips parsing
        RequestFormatter.meta_get(key: encoded_key, return_cas: true, base64: base64, quiet: true,
                                  skip_flags: raw_mode?)
      end

      def gat(key, ttl, options = nil)
        ttl = TtlSanitizer.sanitize(ttl)
        encoded_key, base64 = KeyRegularizer.encode(key)
        skip_flags = raw_mode? || (options && options[:raw])
        req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, base64: base64, skip_flags: skip_flags)
        write(req)
        @connection_manager.flush
        response_processor.meta_get_with_value(cache_nils: cache_nils?(options))
      end

      def touch(key, ttl)
        ttl = TtlSanitizer.sanitize(ttl)
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, value: false, base64: base64)
        write(req)
        @connection_manager.flush
        response_processor.meta_get_without_value
      end

      # TODO: This is confusing, as there's a cas command in memcached
      # and this isn't it.  Maybe rename?  Maybe eliminate?
      def cas(key)
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_get(key: encoded_key, value: true, return_cas: true, base64: base64)
        write(req)
        @connection_manager.flush
        response_processor.meta_get_with_value_and_cas
      end

      # Comprehensive meta get with support for all metadata flags.
      # @note Requires memcached 1.6+ (meta protocol feature)
      #
      # This is the full-featured get method that supports:
      # - Thundering herd protection (vivify_ttl, recache_ttl)
      # - Item metadata (hit_status, last_access)
      # - LRU control (skip_lru_bump)
      #
      # @param key [String] the key to retrieve
      # @param options [Hash] options controlling what metadata to return
      #   - :vivify_ttl [Integer] creates a stub on miss with this TTL (N flag)
      #   - :recache_ttl [Integer] wins recache race if remaining TTL is below this (R flag)
      #   - :return_hit_status [Boolean] return whether item was previously accessed (h flag)
      #   - :return_last_access [Boolean] return seconds since last access (l flag)
      #   - :skip_lru_bump [Boolean] don't bump LRU or update access stats (u flag)
      #   - :cache_nils [Boolean] whether to cache nil values
      # @return [Hash] containing:
      #   - :value - the cached value (or nil on miss)
      #   - :cas - the CAS value
      #   - :won_recache - true if client won recache race (W flag)
      #   - :stale - true if item is stale (X flag)
      #   - :lost_recache - true if another client is recaching (Z flag)
      #   - :hit_before - true/false if previously accessed (only if return_hit_status: true)
      #   - :last_access - seconds since last access (only if return_last_access: true)
      def meta_get(key, options = {})
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_get(
          key: encoded_key, value: true, return_cas: true, base64: base64,
          vivify_ttl: options[:vivify_ttl], recache_ttl: options[:recache_ttl],
          return_hit_status: options[:return_hit_status],
          return_last_access: options[:return_last_access], skip_lru_bump: options[:skip_lru_bump]
        )
        write(req)
        @connection_manager.flush
        response_processor.meta_get_with_metadata(
          cache_nils: cache_nils?(options), return_hit_status: options[:return_hit_status],
          return_last_access: options[:return_last_access]
        )
      end

      # Delete with stale invalidation instead of actual deletion.
      # Used with thundering herd protection to mark items as stale rather than removing them.
      # @note Requires memcached 1.6+ (meta protocol feature)
      #
      # @param key [String] the key to invalidate
      # @param cas [Integer] optional CAS value for compare-and-swap
      # @return [Boolean] true if successful
      def delete_stale(key, cas = nil)
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_delete(key: encoded_key, cas: cas, base64: base64, stale: true)
        write(req)
        @connection_manager.flush
        response_processor.meta_delete
      end

      # Storage Commands
      def set(key, value, ttl, cas, options)
        write_storage_req(:set, key, value, ttl, cas, options)
        response_processor.meta_set_with_cas unless quiet?
      end

      # Pipelined set - writes a quiet set request without reading response.
      # Used by PipelinedSetter for bulk operations.
      def pipelined_set(key, value, ttl, options)
        write_storage_req(:set, key, value, ttl, nil, options, quiet: true)
      end

      def add(key, value, ttl, options)
        write_storage_req(:add, key, value, ttl, nil, options)
        response_processor.meta_set_with_cas unless quiet?
      end

      def replace(key, value, ttl, cas, options)
        write_storage_req(:replace, key, value, ttl, cas, options)
        response_processor.meta_set_with_cas unless quiet?
      end

      # rubocop:disable Metrics/ParameterLists
      def write_storage_req(mode, key, raw_value, ttl = nil, cas = nil, options = {}, quiet: quiet?)
        (value, bitflags) = @value_marshaller.store(key, raw_value, options)
        ttl = TtlSanitizer.sanitize(ttl) if ttl
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_set(key: encoded_key, value: value,
                                        bitflags: bitflags, cas: cas,
                                        ttl: ttl, mode: mode, quiet: quiet, base64: base64)
        write(req)
        write(value)
        write(TERMINATOR)
        @connection_manager.flush unless quiet
      end
      # rubocop:enable Metrics/ParameterLists

      def append(key, value)
        write_append_prepend_req(:append, key, value)
        response_processor.meta_set_append_prepend unless quiet?
      end

      def prepend(key, value)
        write_append_prepend_req(:prepend, key, value)
        response_processor.meta_set_append_prepend unless quiet?
      end

      # rubocop:disable Metrics/ParameterLists
      def write_append_prepend_req(mode, key, value, ttl = nil, cas = nil, _options = {})
        ttl = TtlSanitizer.sanitize(ttl) if ttl
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_set(key: encoded_key, value: value, base64: base64,
                                        cas: cas, ttl: ttl, mode: mode, quiet: quiet?)
        write(req)
        write(value)
        write(TERMINATOR)
        @connection_manager.flush unless quiet?
      end
      # rubocop:enable Metrics/ParameterLists

      # Delete Commands
      def delete(key, cas)
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_delete(key: encoded_key, cas: cas,
                                           base64: base64, quiet: quiet?)
        write(req)
        @connection_manager.flush unless quiet?
        response_processor.meta_delete unless quiet?
      end

      # Pipelined delete - writes a quiet delete request without reading response.
      # Used by PipelinedDeleter for bulk operations.
      def pipelined_delete(key)
        encoded_key, base64 = KeyRegularizer.encode(key)
        req = RequestFormatter.meta_delete(key: encoded_key, base64: base64, quiet: true)
        write(req)
      end

      # Arithmetic Commands
      def decr(key, count, ttl, initial)
        decr_incr false, key, count, ttl, initial
      end

      def incr(key, count, ttl, initial)
        decr_incr true, key, count, ttl, initial
      end

      def decr_incr(incr, key, delta, ttl, initial)
        ttl = initial ? TtlSanitizer.sanitize(ttl) : nil # Only set a TTL if we want to set a value on miss
        encoded_key, base64 = KeyRegularizer.encode(key)
        write(RequestFormatter.meta_arithmetic(key: encoded_key, delta: delta, initial: initial, incr: incr, ttl: ttl,
                                               quiet: quiet?, base64: base64))
        @connection_manager.flush unless quiet?
        response_processor.decr_incr unless quiet?
      end

      # Other Commands
      def flush(delay = 0)
        write(RequestFormatter.flush(delay: delay))
        @connection_manager.flush unless quiet?
        response_processor.flush unless quiet?
      end

      # Noop is a keepalive operation but also used to demarcate the end of a set of pipelined commands.
      # We need to read all the responses at once.
      def noop
        write_noop
        response_processor.consume_all_responses_until_mn
      end

      def stats(info = nil)
        write(RequestFormatter.stats(info))
        @connection_manager.flush
        response_processor.stats
      end

      def reset_stats
        write(RequestFormatter.stats('reset'))
        @connection_manager.flush
        response_processor.reset
      end

      def version
        write(RequestFormatter.version)
        @connection_manager.flush
        response_processor.version
      end

      def write_noop
        write(RequestFormatter.meta_noop)
        @connection_manager.flush
      end

      # Single-server fast path for get_multi. Inlines request formatting and
      # response parsing to minimize per-key overhead. Avoids the PipelinedGetter
      # machinery (IO.select, response buffering, server grouping).
      def read_multi_req(keys)
        is_raw = raw_mode?
        # Inline request formatting — avoids RequestFormatter.meta_get overhead per key.
        # In raw mode: "mg <key> v k q s\r\n" (no f flag, key at index 2)
        # Normal mode: "mg <key> v f k q s\r\n" (key at index 3)
        post_get = is_raw ? " v k q s\r\n" : " v f k q s\r\n"
        keys.each do |key|
          encoded_key, base64 = KeyRegularizer.encode(key)
          write(base64 ? "mg #{encoded_key} b#{post_get}" : "mg #{encoded_key}#{post_get}")
        end
        write("mn\r\n")
        @connection_manager.flush

        read_multi_get_responses(is_raw)
      end

      def read_multi_get_responses(is_raw)
        hash = {}
        key_index = is_raw ? 2 : 3
        while (line = @connection_manager.read_line)
          break if line.start_with?('MN')
          next unless line.start_with?('VA ')

          key, value = parse_multi_get_value(line, key_index, is_raw)
          hash[key] = value if key
        end
        hash
      end

      def parse_multi_get_value(line, key_index, is_raw)
        tokens = line.chomp!(TERMINATOR).split
        value = @connection_manager.read(tokens[1].to_i + TERMINATOR.bytesize)&.chomp!(TERMINATOR)
        raw_key = tokens[key_index]
        return unless raw_key

        key = KeyRegularizer.decode(raw_key[1..], tokens.include?('b'))
        bitflags = is_raw ? 0 : response_processor.bitflags_from_tokens(tokens)
        [key, @value_marshaller.retrieve(value, bitflags)]
      end

      # Single-server fast path for set_multi. Inlines request formatting to
      # minimize per-key overhead. Avoids PipelinedSetter server grouping.
      def write_multi_req(pairs, ttl, req_options)
        ttl = TtlSanitizer.sanitize(ttl) if ttl
        pairs.each do |key, raw_value|
          (value, bitflags) = @value_marshaller.store(key, raw_value, req_options)
          encoded_key, base64 = KeyRegularizer.encode(key)
          # Inline format: "ms <key> <size> c [b] F<flags> T<ttl> MS q\r\n"
          cmd = "ms #{encoded_key} #{value.bytesize} c"
          cmd << ' b' if base64
          cmd << " F#{bitflags}" if bitflags
          cmd << " T#{ttl}" if ttl
          cmd << " MS q\r\n"
          write(cmd)
          write(value)
          write(TERMINATOR)
        end
        write_noop
        response_processor.consume_all_responses_until_mn
      end

      # Single-server fast path for delete_multi. Writes all quiet delete requests
      # terminated by a noop, then consumes all responses.
      def delete_multi_req(keys)
        keys.each do |key|
          encoded_key, base64 = KeyRegularizer.encode(key)
          # Inline format: "md <key> [b] q\r\n"
          write(base64 ? "md #{encoded_key} b q\r\n" : "md #{encoded_key} q\r\n")
        end
        write_noop
        response_processor.consume_all_responses_until_mn
      end

      require_relative 'key_regularizer'
      require_relative 'request_formatter'
      require_relative 'response_processor'
    end
  end
end