File: socket.rb

package info (click to toggle)
ruby-dalli 5.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 992 kB
  • sloc: ruby: 9,447; sh: 19; makefile: 4
file content (256 lines) | stat: -rw-r--r-- 9,805 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# frozen_string_literal: true

require 'openssl'
require 'rbconfig'

module Dalli
  ##
  # Various socket implementations used by Dalli.
  ##
  module Socket
    ##
    # Common methods for all socket implementations.
    ##
    module InstanceMethods
      def readfull(count)
        value = String.new(capacity: count + 1)
        loop do
          result = read_nonblock(count - value.bytesize, exception: false)
          value << result if append_to_buffer?(result)
          break if value.bytesize == count
        end
        value
      end

      def read_available
        value = +''
        loop do
          result = read_nonblock(8196, exception: false)
          break if WAIT_RCS.include?(result)
          raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result

          value << result
        end
        value
      end

      WAIT_RCS = %i[wait_writable wait_readable].freeze

      def append_to_buffer?(result)
        raise Timeout::Error, "IO timeout: #{logged_options.inspect}" if nonblock_timed_out?(result)
        raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result

        !WAIT_RCS.include?(result)
      end

      def nonblock_timed_out?(result)
        return true if result == :wait_readable && !wait_readable(options[:socket_timeout])

        # TODO: Do we actually need this?  Looks to be only used in read_nonblock
        result == :wait_writable && !wait_writable(options[:socket_timeout])
      end

      FILTERED_OUT_OPTIONS = %i[username password].freeze
      def logged_options
        options.except(*FILTERED_OUT_OPTIONS)
      end
    end

    ##
    # Wraps the below TCP socket class in the case where the client
    # has configured a TLS/SSL connection between Dalli and the
    # Memcached server.
    ##
    class SSLSocket < ::OpenSSL::SSL::SSLSocket
      include Dalli::Socket::InstanceMethods

      def options
        io.options
      end

      unless method_defined?(:wait_readable)
        def wait_readable(timeout = nil)
          to_io.wait_readable(timeout)
        end
      end

      unless method_defined?(:wait_writable)
        def wait_writable(timeout = nil)
          to_io.wait_writable(timeout)
        end
      end
    end

    ##
    # A standard TCP socket between the Dalli client and the Memcached server.
    ##
    class TCP < TCPSocket
      include Dalli::Socket::InstanceMethods

      # options - supports enhanced logging in the case of a timeout
      attr_accessor :options

      # Expected parameter signature for unmodified TCPSocket#initialize.
      # Used to detect when gems like socksify or resolv-replace have monkey-patched
      # TCPSocket, which breaks the connect_timeout: keyword argument.
      TCPSOCKET_NATIVE_PARAMETERS = [[:rest]].freeze
      private_constant :TCPSOCKET_NATIVE_PARAMETERS

      def self.open(host, port, options = {})
        create_socket_with_timeout(host, port, options) do |sock|
          sock.options = { host: host, port: port }.merge(options)
          init_socket_options(sock, options)

          options[:ssl_context] ? wrapping_ssl_socket(sock, host, options[:ssl_context]) : sock
        end
      end

      # Detect and cache whether TCPSocket supports the connect_timeout: keyword argument.
      # Returns false if TCPSocket#initialize has been monkey-patched by gems like
      # socksify or resolv-replace, which don't support keyword arguments.
      # rubocop:disable ThreadSafety/ClassInstanceVariable
      def self.supports_connect_timeout?
        return @supports_connect_timeout if defined?(@supports_connect_timeout)

        @supports_connect_timeout = RUBY_VERSION >= '3.0' &&
                                    ::TCPSocket.instance_method(:initialize).parameters == TCPSOCKET_NATIVE_PARAMETERS
      end
      # rubocop:enable ThreadSafety/ClassInstanceVariable

      def self.create_socket_with_timeout(host, port, options)
        if supports_connect_timeout?
          sock = new(host, port, connect_timeout: options[:socket_timeout])
          yield(sock)
        else
          Timeout.timeout(options[:socket_timeout]) do
            sock = new(host, port)
            yield(sock)
          end
        end
      end

      def self.init_socket_options(sock, options)
        configure_tcp_options(sock, options)
        configure_socket_buffers(sock, options)
        configure_timeout(sock, options)
      end

      def self.configure_tcp_options(sock, options)
        sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true)
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) if options[:keepalive]
      end

      def self.configure_socket_buffers(sock, options)
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVBUF, options[:rcvbuf]) if options[:rcvbuf]
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf]
      end

      def self.configure_timeout(sock, options)
        return unless options[:socket_timeout]

        if sock.respond_to?(:timeout=)
          # Ruby 3.2+ has IO#timeout for reliable cross-platform timeout handling
          sock.timeout = options[:socket_timeout]
        else
          # Ruby 3.1 fallback using socket options
          # struct timeval has architecture-dependent sizes (time_t, suseconds_t)
          seconds, fractional = options[:socket_timeout].divmod(1)
          microseconds = (fractional * 1_000_000).to_i
          # struct timeval is (time_t, suseconds_t), which translates to either (int32_t, long) or
          # (int64_t, long) depending on the architecture. For example, on Debian, all 64b
          # architectures have int64_t for time_t, but for 32b architectures, it depends:
          # armel and armhf use int64_t, while i386 stayed with int32_t.
          # Unfortunately Array::pack does not know about time_t. So we generate both candidates,
          # use getsockopt to get a timeval, and compare sizes.
          # Supported timeval formats:
          timeval_formats = [
            'q l_',       # 64-bit time_t, 64-bit long (e.g., amd64, arm64)
            'l l_',       # 32-bit time_t, 32-bit long (e.g., i386)
            'q l_ x4'     # 64-bit time_t, 32-bit long + padding (e.g., armel on Debian)
          ]
  
          timeval_args = [seconds, microseconds]
          expected_length = sock.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO).data.length
          timeval_format = timeval_formats.find do |fmt|
            timeval_args.pack(fmt).length == expected_length
          end
          raise Dalli::DalliError,"Unable to determine appropriate timeval format" unless timeval_format
          timeval = timeval_args.pack(timeval_format)

          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO, timeval)
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO, timeval)
        end
      end

      # Pack formats for struct timeval across architectures.
      # Uses fixed-size formats for JRuby compatibility (JRuby doesn't support _ modifier on q).
      # - ll: 8 bytes (32-bit time_t, 32-bit suseconds_t)
      # - qq: 16 bytes (64-bit time_t, 64-bit suseconds_t or padded 32-bit)
      TIMEVAL_PACK_FORMATS = %w[ll qq].freeze
      TIMEVAL_TEST_VALUES = [0, 0].freeze

      # Detect and cache the correct pack format for struct timeval on this platform.
      # Different architectures have different sizes for time_t and suseconds_t.
      # rubocop:disable ThreadSafety/ClassInstanceVariable
      def self.timeval_pack_format(sock)
        @timeval_pack_format ||= begin
          expected_size = sock.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO).data.bytesize
          TIMEVAL_PACK_FORMATS.find { |fmt| TIMEVAL_TEST_VALUES.pack(fmt).bytesize == expected_size } || 'll'
        end
      end
      # rubocop:enable ThreadSafety/ClassInstanceVariable

      def self.pack_timeval(sock, seconds, microseconds)
        [seconds, microseconds].pack(timeval_pack_format(sock))
      end

      def self.wrapping_ssl_socket(tcp_socket, host, ssl_context)
        ssl_socket = Dalli::Socket::SSLSocket.new(tcp_socket, ssl_context)
        ssl_socket.hostname = host
        ssl_socket.sync_close = true
        ssl_socket.connect
        ssl_socket
      end
    end

    if /mingw|mswin/.match?(RbConfig::CONFIG['host_os'])
      ##
      # UNIX domain sockets are not supported on Windows platforms.
      ##
      class UNIX
        def initialize(*_args)
          raise Dalli::DalliError, 'Unix sockets are not supported on Windows platform.'
        end
      end
    else

      ##
      # UNIX represents a UNIX domain socket, which is an interprocess communication
      # mechanism between processes on the same host.  Used when the Memcached server
      # is running on the same machine as the Dalli client.
      ##
      class UNIX < UNIXSocket
        include Dalli::Socket::InstanceMethods

        # options - supports enhanced logging in the case of a timeout
        # server  - used to support IO.select in the pipelined getter
        attr_accessor :options

        def self.open(path, options = {})
          Timeout.timeout(options[:socket_timeout]) do
            sock = new(path)
            sock.options = { path: path }.merge(options)
            init_socket_options(sock, options)
            sock
          end
        end

        def self.init_socket_options(sock, options)
          # https://man7.org/linux/man-pages/man7/unix.7.html
          sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf]
          sock.timeout = options[:socket_timeout] if options[:socket_timeout] && sock.respond_to?(:timeout=)
        end
      end
    end
  end
end