File: system.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (279 lines) | stat: -rw-r--r-- 7,262 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# frozen_string_literal: true

require "resolv"

module HTTPX
  # Implementation of a synchronous name resolver which relies on the system resolver,
  # which is lib'c getaddrinfo function (abstracted in ruby via Addrinfo.getaddrinfo).
  #
  # Its main advantage is relying on the reference implementation for name resolution
  # across most/all OSs which deploy ruby (it's what TCPSocket also uses), its main
  # disadvantage is the inability to set timeouts / check socket for readiness events,
  # hence why it relies on using the Timeout module, which poses a lot of problems for
  # the selector loop, specially when network is unstable.
  #
  class Resolver::System < Resolver::Resolver
    using URIExtensions

    RESOLV_ERRORS = [Resolv::ResolvError,
                     Resolv::DNS::Requester::RequestError,
                     Resolv::DNS::EncodeError,
                     Resolv::DNS::DecodeError].freeze

    DONE = 1
    ERROR = 2

    class << self
      def multi?
        false
      end
    end

    attr_reader :state

    def initialize(options)
      super(0, options)
      @resolver_options = @options.resolver_options
      resolv_options = @resolver_options.dup
      timeouts = resolv_options.delete(:timeouts) || Resolver::RESOLVE_TIMEOUT
      @_timeouts = Array(timeouts)
      @timeouts = Hash.new { |tims, host| tims[host] = @_timeouts.dup }
      resolv_options.delete(:cache)
      @queries = []
      @ips = []
      @pipe_mutex = Thread::Mutex.new
      @state = :idle
    end

    def resolvers
      return enum_for(__method__) unless block_given?

      yield self
    end

    def multi
      self
    end

    def empty?
      @connections.empty?
    end

    def close
      transition(:closed)
    end

    def force_close(*)
      close
      @queries.clear
      @timeouts.clear
      @ips.clear
      super
    end

    def closed?
      @state == :closed
    end

    def to_io
      @pipe_read.to_io
    end

    def call
      case @state
      when :open
        consume
      end
      nil
    end

    def interests
      return if @queries.empty?

      :r
    end

    def timeout
      _, connection = @queries.first

      return unless connection

      timeouts = @timeouts[connection.peer.host]

      return if timeouts.empty?

      log(level: 2) { "resolver #{FAMILY_TYPES[@record_type]}: next timeout #{timeouts.first} secs... (#{timeouts.size - 1} left)" }

      timeouts.first
    end

    def lazy_resolve(connection)
      @connections << connection
      resolve

      return if empty?

      @current_session.select_resolver(self, @current_selector)
    end

    def early_resolve(connection, **); end

    def handle_socket_timeout(interval)
      error = HTTPX::ResolveTimeoutError.new(interval, "timed out while waiting on select")
      error.set_backtrace(caller)
      @queries.each do |_, connection| # rubocop:disable Style/HashEachMethods
        emit_resolve_error(connection, connection.peer.host, error) if @connections.delete(connection)
      end

      while (connection = @connections.shift)
        emit_resolve_error(connection, connection.peer.host, error)
      end

      close_or_resolve
    end

    private

    def transition(nextstate)
      case nextstate
      when :idle
        @timeouts.clear
      when :open
        return unless @state == :idle

        @pipe_read, @pipe_write = IO.pipe
      when :closed
        return unless @state == :open

        @pipe_write.close
        @pipe_read.close
      end
      @state = nextstate
    end

    def consume
      return if @connections.empty?

      event = @pipe_read.read_nonblock(1, exception: false)

      return if event == :wait_readable

      raise ResolveError, "socket pipe closed unexpectedly" if event.nil?

      case event.unpack1("C")
      when DONE
        *pair, addrs = @pipe_mutex.synchronize { @ips.pop }
        if pair
          @queries.delete(pair)
          family, connection = pair
          @connections.delete(connection)

          catch(:coalesced) { emit_addresses(connection, family, addrs) }
        end
      when ERROR
        *pair, error = @pipe_mutex.synchronize { @ips.pop }
        if pair && error
          @queries.delete(pair)
          _, connection = pair
          @connections.delete(connection)

          emit_resolve_error(connection, connection.peer.host, error)
        end
      end

      return disconnect if @connections.empty?

      resolve
    rescue StandardError => e
      on_error(e)
    end

    def resolve(connection = nil, hostname = nil)
      @connections.shift until @connections.empty? || @connections.first.state != :closed

      connection ||= @connections.first

      raise Error, "no URI to resolve" unless connection

      return unless @queries.empty?

      hostname ||= connection.peer.host
      scheme = connection.origin.scheme
      log do
        "resolver: resolve IDN #{connection.peer.non_ascii_hostname} as #{hostname}"
      end if connection.peer.non_ascii_hostname

      transition(:open)

      ip_families = connection.options.ip_families || Resolver.supported_ip_families

      ip_families.each do |family|
        @queries << [family, connection]
      end
      async_resolve(connection, hostname, scheme)
      consume
    end

    def async_resolve(connection, hostname, scheme)
      families = connection.options.ip_families || Resolver.supported_ip_families
      log { "resolver: query for #{hostname}" }
      timeouts = @timeouts[connection.peer.host]
      resolve_timeout = timeouts.first

      Thread.start do
        Thread.current.report_on_exception = false
        begin
          addrs = if resolve_timeout

            Timeout.timeout(resolve_timeout) do
              __addrinfo_resolve(hostname, scheme)
            end
          else
            __addrinfo_resolve(hostname, scheme)
          end
          addrs = addrs.sort_by(&:afamily).group_by(&:afamily)
          families.each do |family|
            addresses = addrs[family]
            next unless addresses

            addresses.map!(&:ip_address)
            addresses.uniq!
            @pipe_mutex.synchronize do
              @ips.unshift([family, connection, addresses])
              @pipe_write.putc(DONE) unless @pipe_write.closed?
            end
          end
        rescue StandardError => e
          if e.is_a?(Timeout::Error)
            timeouts.shift
            retry unless timeouts.empty?
            e = ResolveTimeoutError.new(resolve_timeout, e.message)
            e.set_backtrace(e.backtrace)
          end
          @pipe_mutex.synchronize do
            families.each do |family|
              @ips.unshift([family, connection, e])
              @pipe_write.putc(ERROR) unless @pipe_write.closed?
            end
          end
        end
      end
      Thread.pass
    end

    def close_or_resolve
      # drop already closed connections
      @connections.shift until @connections.empty? || @connections.first.state != :closed

      if (@connections - @queries.map(&:last)).empty?
        disconnect
      else
        resolve
      end
    end

    def __addrinfo_resolve(host, scheme)
      Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
    end
  end
end