File: socket.rb

package info (click to toggle)
ruby-dalli 3.2.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 684 kB
  • sloc: ruby: 6,552; sh: 20; makefile: 4
file content (195 lines) | stat: -rw-r--r-- 7,145 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# frozen_string_literal: true

require 'openssl'
require 'rbconfig'

module Dalli
  ##
  # Various socket implementations used by Dalli.
  ##
  module Socket
    ##
    # Common methods for all socket implementations.
    ##
    module InstanceMethods
      def readfull(count)
        value = String.new(capacity: count + 1)
        loop do
          result = read_nonblock(count - value.bytesize, exception: false)
          value << result if append_to_buffer?(result)
          break if value.bytesize == count
        end
        value
      end

      def read_available
        value = +''
        loop do
          result = read_nonblock(8196, exception: false)
          break if WAIT_RCS.include?(result)
          raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result

          value << result
        end
        value
      end

      WAIT_RCS = %i[wait_writable wait_readable].freeze

      def append_to_buffer?(result)
        raise Timeout::Error, "IO timeout: #{logged_options.inspect}" if nonblock_timed_out?(result)
        raise Errno::ECONNRESET, "Connection reset: #{logged_options.inspect}" unless result

        !WAIT_RCS.include?(result)
      end

      def nonblock_timed_out?(result)
        return true if result == :wait_readable && !wait_readable(options[:socket_timeout])

        # TODO: Do we actually need this?  Looks to be only used in read_nonblock
        result == :wait_writable && !wait_writable(options[:socket_timeout])
      end

      FILTERED_OUT_OPTIONS = %i[username password].freeze
      def logged_options
        options.reject { |k, _| FILTERED_OUT_OPTIONS.include? k }
      end
    end

    ##
    # Wraps the below TCP socket class in the case where the client
    # has configured a TLS/SSL connection between Dalli and the
    # Memcached server.
    ##
    class SSLSocket < ::OpenSSL::SSL::SSLSocket
      include Dalli::Socket::InstanceMethods
      def options
        io.options
      end

      unless method_defined?(:wait_readable)
        def wait_readable(timeout = nil)
          to_io.wait_readable(timeout)
        end
      end

      unless method_defined?(:wait_writable)
        def wait_writable(timeout = nil)
          to_io.wait_writable(timeout)
        end
      end
    end

    ##
    # A standard TCP socket between the Dalli client and the Memcached server.
    ##
    class TCP < TCPSocket
      include Dalli::Socket::InstanceMethods
      # options - supports enhanced logging in the case of a timeout
      attr_accessor :options

      def self.open(host, port, options = {})
        create_socket_with_timeout(host, port, options) do |sock|
          sock.options = { host: host, port: port }.merge(options)
          init_socket_options(sock, options)

          options[:ssl_context] ? wrapping_ssl_socket(sock, host, options[:ssl_context]) : sock
        end
      end

      def self.create_socket_with_timeout(host, port, options)
        # Check that TCPSocket#initialize was not overwritten by resolv-replace gem
        # (part of ruby standard library since 3.0.0, should be removed in 3.4.0),
        # as it does not handle keyword arguments correctly.
        # To check this we are using the fact that resolv-replace
        # aliases TCPSocket#initialize method to #original_resolv_initialize.
        # https://github.com/ruby/resolv-replace/blob/v0.1.1/lib/resolv-replace.rb#L21
        if RUBY_VERSION >= '3.0' &&
           !::TCPSocket.private_instance_methods.include?(:original_resolv_initialize)
          sock = new(host, port, connect_timeout: options[:socket_timeout])
          yield(sock)
        else
          Timeout.timeout(options[:socket_timeout]) do
            sock = new(host, port)
            yield(sock)
          end
        end
      end

      def self.init_socket_options(sock, options)
        sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, true)
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true) if options[:keepalive]
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVBUF, options[:rcvbuf]) if options[:rcvbuf]
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDBUF, options[:sndbuf]) if options[:sndbuf]

        return unless options[:socket_timeout]

        seconds, fractional = options[:socket_timeout].divmod(1)
        microseconds = fractional * 1_000_000
        # struct timeval is (time_t, suseconds_t), which translates to either (int32_t, long) or
        # (int64_t, long) depending on the architecture. For example, on Debian, all 64b
        # architectures have int64_t for time_t, but for 32b architectures, it depends:
        # armel and armhf use int64_t, while i386 stayed with int32_t.
        # Unfortunately Array::pack does not know about time_t. So we generate both candidates,
        # use getsockopt to get a timeval, and compare sizes.
        # Supported timeval formats:
        timeval_formats = [
          'q l_',       # 64-bit time_t, 64-bit long (e.g., amd64, arm64)
          'l l_',       # 32-bit time_t, 32-bit long (e.g., i386)
          'q l_ x4'     # 64-bit time_t, 32-bit long + padding (e.g., armel on Debian)
        ]

        timeval_args = [seconds, microseconds]
        expected_length = sock.getsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO).data.length
        timeval_format = timeval_formats.find do |fmt|
          timeval_args.pack(fmt).length == expected_length
        end
        raise Dalli::DalliError,"Unable to determine appropriate timeval format" unless timeval_format
        timeval = timeval_args.pack(timeval_format)
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_RCVTIMEO, timeval)
        sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_SNDTIMEO, timeval)
      end

      def self.wrapping_ssl_socket(tcp_socket, host, ssl_context)
        ssl_socket = Dalli::Socket::SSLSocket.new(tcp_socket, ssl_context)
        ssl_socket.hostname = host
        ssl_socket.sync_close = true
        ssl_socket.connect
        ssl_socket
      end
    end

    if /mingw|mswin/.match?(RbConfig::CONFIG['host_os'])
      ##
      # UNIX domain sockets are not supported on Windows platforms.
      ##
      class UNIX
        def initialize(*_args)
          raise Dalli::DalliError, 'Unix sockets are not supported on Windows platform.'
        end
      end
    else

      ##
      # UNIX represents a UNIX domain socket, which is an interprocess communication
      # mechanism between processes on the same host.  Used when the Memcached server
      # is running on the same machine as the Dalli client.
      ##
      class UNIX < UNIXSocket
        include Dalli::Socket::InstanceMethods

        # options - supports enhanced logging in the case of a timeout
        # server  - used to support IO.select in the pipelined getter
        attr_accessor :options

        def self.open(path, options = {})
          Timeout.timeout(options[:socket_timeout]) do
            sock = new(path)
            sock.options = { path: path }.merge(options)
            sock
          end
        end
      end
    end
  end
end