File: base.rb

package info (click to toggle)
ruby-dalli 5.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 992 kB
  • sloc: ruby: 9,447; sh: 19; makefile: 4
file content (320 lines) | stat: -rw-r--r-- 11,461 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# frozen_string_literal: true

require 'forwardable'
require 'socket'
require 'timeout'

module Dalli
  module Protocol
    ##
    # Base class for a single Memcached server, containing logic common to all
    # protocols.  Contains logic for managing connection state to the server and value
    # handling.
    ##
    class Base
      extend Forwardable

      attr_accessor :weight, :options

      def_delegators :@value_marshaller, :serializer, :compressor, :compression_min_size, :compress_by_default?
      def_delegators :@connection_manager, :name, :sock, :hostname, :port, :close, :connected?, :socket_timeout,
                     :socket_type, :up!, :down!, :write, :reconnect_down_server?, :raise_down_error

      def initialize(attribs, client_options = {})
        hostname, port, socket_type, @weight, user_creds = ServerConfigParser.parse(attribs)
        warn_uri_credentials(user_creds)
        @options = client_options.merge(user_creds)
        @raw_mode = client_options[:raw]
        @value_marshaller = @raw_mode ? StringMarshaller.new(@options) : ValueMarshaller.new(@options)
        @connection_manager = ConnectionManager.new(hostname, port, socket_type, @options)
      end

      # Returns true if client is in raw mode (no serialization/compression).
      # In raw mode, we can skip requesting bitflags from the server.
      def raw_mode?
        @raw_mode
      end

      # Chokepoint method for error handling and ensuring liveness
      def request(opkey, *args)
        verify_state(opkey)

        begin
          @connection_manager.start_request!
          response = send(opkey, *args)

          # pipelined_get/pipelined_get_interleaved emit query but don't read the response(s)
          @connection_manager.finish_request! unless %i[pipelined_get pipelined_get_interleaved].include?(opkey)

          response
        rescue Dalli::MarshalError => e
          log_marshal_err(args.first, e)
          raise
        rescue Dalli::DalliError
          raise
        rescue StandardError => e
          log_unexpected_err(e)
          close
          raise
        end
      end

      ##
      # Boolean method used by clients of this class to determine if this
      # particular memcached instance is available for use.
      def alive?
        ensure_connected!
      rescue Dalli::NetworkError
        # ensure_connected! raises a NetworkError if connection fails.  We
        # want to capture that error and convert it to a boolean value here.
        false
      end

      def lock!; end

      def unlock!; end

      # Start reading key/value pairs from this connection. This is usually called
      # after a series of GETKQ commands. A NOOP is sent, and the server begins
      # flushing responses for kv pairs that were found.
      #
      # Returns nothing.
      def pipeline_response_setup
        verify_pipelined_state(:getkq)
        write_noop
        # Use ensure_ready instead of reset to preserve any data already buffered
        # during interleaved pipelined get draining
        response_buffer.ensure_ready
      end

      # Attempt to receive and parse as many key/value pairs as possible
      # from this server. After #pipeline_response_setup, this should be invoked
      # repeatedly whenever this server's socket is readable until
      # #pipeline_complete?.
      #
      # When a block is given, yields (key, value, cas) for each response,
      # avoiding intermediate Hash allocation. Returns nil.
      # Without a block, returns a Hash of { key => [value, cas] }.
      # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
      def pipeline_next_responses(&block)
        reconnect_on_pipeline_complete!
        values = nil

        response_buffer.read

        status, cas, key, value = response_buffer.process_single_getk_response
        # status is not nil only if we have a full response to parse
        # in the buffer
        until status.nil?
          # If the status is ok and key is nil, then this is the response
          # to the noop at the end of the pipeline
          finish_pipeline && break if status && key.nil?

          # If the status is ok and the key is not nil, then this is a
          # getkq response with a value that we want to set in the response hash
          unless key.nil?
            if block
              yield key, value, cas
            else
              values ||= {}
              values[key] = [value, cas]
            end
          end

          # Get the next response from the buffer
          status, cas, key, value = response_buffer.process_single_getk_response
        end

        values || {}
      rescue SystemCallError, *TIMEOUT_ERRORS, *SSL_ERRORS, EOFError => e
        @connection_manager.error_on_request!(e)
      end
      # rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity

      # Abort current pipelined get. Generally used to signal an external
      # timeout during pipelined get.  The underlying socket is
      # disconnected, and the exception is swallowed.
      #
      # Returns nothing.
      def pipeline_abort
        response_buffer.clear
        @connection_manager.abort_request!
        return true unless connected?

        # Closes the connection, which ensures that our connection
        # is in a clean state for future requests
        @connection_manager.error_on_request!('External timeout')
      rescue NetworkError
        true
      end

      # Did the last call to #pipeline_response_setup complete successfully?
      def pipeline_complete?
        !response_buffer.in_progress?
      end

      def quiet?
        Thread.current[::Dalli::QUIET]
      end
      alias multi? quiet?

      # NOTE: Additional public methods should be overridden in Dalli::Threadsafe

      private

      URI_CREDENTIAL_WARNING = 'Dalli 5.0 removed SASL authentication. ' \
                               'Credentials in memcached:// URIs are ignored.'
      private_constant :URI_CREDENTIAL_WARNING

      def warn_uri_credentials(user_creds)
        return if user_creds[:username].nil? && user_creds[:password].nil?

        Dalli.logger.warn(URI_CREDENTIAL_WARNING)
      end

      ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze
      private_constant :ALLOWED_QUIET_OPS

      def verify_allowed_quiet!(opkey)
        return if ALLOWED_QUIET_OPS.include?(opkey)

        raise Dalli::NotPermittedMultiOpError, "The operation #{opkey} is not allowed in a quiet block."
      end

      ##
      # Checks to see if we can execute the specified operation.  Checks
      # whether the connection is in use, and whether the command is allowed
      ##
      def verify_state(opkey)
        @connection_manager.confirm_ready!
        verify_allowed_quiet!(opkey) if quiet?

        # The ensure_connected call has the side effect of connecting the
        # underlying socket if it is not connected, or there's been a disconnect
        # because of timeout or other error.  Method raises an error
        # if it can't connect
        raise_down_error unless ensure_connected!
      end

      def verify_pipelined_state(_opkey)
        @connection_manager.confirm_in_progress!
        raise_down_error unless connected?
      end

      # The socket connection to the underlying server is initialized as a side
      # effect of this call.  In fact, this is the ONLY place where that
      # socket connection is initialized.
      #
      # Both this method and connect need to be in this class so we can do auth
      # as required
      #
      # Since this is invoked exclusively in verify_state!, we don't need to worry about
      # thread safety.  Using it elsewhere may require revisiting that assumption.
      def ensure_connected!
        return true if connected?
        return false unless reconnect_down_server?

        connect # This call needs to be in this class so we can do auth
        connected?
      end

      def cache_nils?(opts)
        return false unless opts.is_a?(Hash)

        opts[:cache_nils] ? true : false
      end

      def connect
        @connection_manager.establish_connection
        @version = version
        up!
      end

      def pipelined_get(keys)
        # Clear buffer to remove any stale data from interrupted operations.
        # Use clear (not reset) to keep pipeline_complete? = true, which is
        # the expected state before pipeline_response_setup is called.
        response_buffer.clear

        req = +''
        keys.each do |key|
          req << quiet_get_request(key)
        end
        # Could send noop here instead of in pipeline_response_setup
        write(req)
      end

      # For large batches, interleave writing requests with draining responses.
      # This prevents socket buffer deadlock when sending many keys.
      # Populates the provided results hash with any responses drained during send.
      def pipelined_get_interleaved(keys, chunk_size, results)
        # Initialize the response buffer for draining during send phase
        response_buffer.ensure_ready

        keys.each_slice(chunk_size) do |chunk|
          # Build and write this chunk of requests
          req = +''
          chunk.each do |key|
            req << quiet_get_request(key)
          end
          write(req)
          @connection_manager.flush

          # Drain any available responses directly into results hash
          drain_pipeline_responses(results)
        end
      end

      # Non-blocking read and processing of any available pipeline responses.
      # Used during interleaved pipelined gets to prevent buffer deadlock.
      # Populates the provided results hash directly to avoid allocation overhead.
      def drain_pipeline_responses(results)
        return unless connected?

        # Non-blocking check if socket has data available
        return unless sock.wait_readable(0)

        # Read available data without blocking
        response_buffer.read

        # Process any complete responses in the buffer
        loop do
          status, cas, key, value = response_buffer.process_single_getk_response
          break if status.nil? # No complete response available

          results[key] = [value, cas] unless key.nil?
        end
      rescue SystemCallError, Dalli::NetworkError
        # Ignore errors during drain - they'll be handled in fetch_responses
        nil
      end

      def response_buffer
        @response_buffer ||= ResponseBuffer.new(@connection_manager, response_processor)
      end

      # Called after the noop response is received at the end of a set
      # of pipelined gets
      def finish_pipeline
        response_buffer.clear
        @connection_manager.finish_request!

        true # to simplify response
      end

      def reconnect_on_pipeline_complete!
        @connection_manager.reconnect! 'pipelined get has completed' if pipeline_complete?
      end

      def log_marshal_err(key, err)
        Dalli.logger.error "Marshalling error for key '#{key}': #{err.message}"
        Dalli.logger.error 'You are trying to cache a Ruby object which cannot be serialized to memcached.'
      end

      def log_unexpected_err(err)
        Dalli.logger.error "Unexpected exception during Dalli request: #{err.class.name}: #{err.message}"
        Dalli.logger.error err.backtrace.join("\n\t")
      end
    end
  end
end