1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
|
# frozen_string_literal: true
require "resolv"
module HTTPX
# Implementation of a synchronous name resolver which relies on the system resolver,
# which is lib'c getaddrinfo function (abstracted in ruby via Addrinfo.getaddrinfo).
#
# Its main advantage is relying on the reference implementation for name resolution
# across most/all OSs which deploy ruby (it's what TCPSocket also uses), its main
# disadvantage is the inability to set timeouts / check socket for readiness events,
# hence why it relies on using the Timeout module, which poses a lot of problems for
# the selector loop, specially when network is unstable.
#
class Resolver::System < Resolver::Resolver
using URIExtensions
RESOLV_ERRORS = [Resolv::ResolvError,
Resolv::DNS::Requester::RequestError,
Resolv::DNS::EncodeError,
Resolv::DNS::DecodeError].freeze
DONE = 1
ERROR = 2
class << self
def multi?
false
end
end
attr_reader :state
def initialize(options)
super(0, options)
@resolver_options = @options.resolver_options
resolv_options = @resolver_options.dup
timeouts = resolv_options.delete(:timeouts) || Resolver::RESOLVE_TIMEOUT
@_timeouts = Array(timeouts)
@timeouts = Hash.new { |tims, host| tims[host] = @_timeouts.dup }
resolv_options.delete(:cache)
@queries = []
@ips = []
@pipe_mutex = Thread::Mutex.new
@state = :idle
end
def resolvers
return enum_for(__method__) unless block_given?
yield self
end
def multi
self
end
def empty?
@connections.empty?
end
def close
transition(:closed)
end
def force_close(*)
close
@queries.clear
@timeouts.clear
@ips.clear
super
end
def closed?
@state == :closed
end
def to_io
@pipe_read.to_io
end
def call
case @state
when :open
consume
end
nil
end
def interests
return if @queries.empty?
:r
end
def timeout
_, connection = @queries.first
return unless connection
timeouts = @timeouts[connection.peer.host]
return if timeouts.empty?
log(level: 2) { "resolver #{FAMILY_TYPES[@record_type]}: next timeout #{timeouts.first} secs... (#{timeouts.size - 1} left)" }
timeouts.first
end
def lazy_resolve(connection)
@connections << connection
resolve
return if empty?
@current_session.select_resolver(self, @current_selector)
end
def early_resolve(connection, **); end
def handle_socket_timeout(interval)
error = HTTPX::ResolveTimeoutError.new(interval, "timed out while waiting on select")
error.set_backtrace(caller)
@queries.each do |_, connection| # rubocop:disable Style/HashEachMethods
emit_resolve_error(connection, connection.peer.host, error) if @connections.delete(connection)
end
while (connection = @connections.shift)
emit_resolve_error(connection, connection.peer.host, error)
end
close_or_resolve
end
private
def transition(nextstate)
case nextstate
when :idle
@timeouts.clear
when :open
return unless @state == :idle
@pipe_read, @pipe_write = IO.pipe
when :closed
return unless @state == :open
@pipe_write.close
@pipe_read.close
end
@state = nextstate
end
def consume
return if @connections.empty?
event = @pipe_read.read_nonblock(1, exception: false)
return if event == :wait_readable
raise ResolveError, "socket pipe closed unexpectedly" if event.nil?
case event.unpack1("C")
when DONE
*pair, addrs = @pipe_mutex.synchronize { @ips.pop }
if pair
@queries.delete(pair)
family, connection = pair
@connections.delete(connection)
catch(:coalesced) { emit_addresses(connection, family, addrs) }
end
when ERROR
*pair, error = @pipe_mutex.synchronize { @ips.pop }
if pair && error
@queries.delete(pair)
_, connection = pair
@connections.delete(connection)
emit_resolve_error(connection, connection.peer.host, error)
end
end
return disconnect if @connections.empty?
resolve
rescue StandardError => e
on_error(e)
end
def resolve(connection = nil, hostname = nil)
@connections.shift until @connections.empty? || @connections.first.state != :closed
connection ||= @connections.first
raise Error, "no URI to resolve" unless connection
return unless @queries.empty?
hostname ||= connection.peer.host
scheme = connection.origin.scheme
log do
"resolver: resolve IDN #{connection.peer.non_ascii_hostname} as #{hostname}"
end if connection.peer.non_ascii_hostname
transition(:open)
ip_families = connection.options.ip_families || Resolver.supported_ip_families
ip_families.each do |family|
@queries << [family, connection]
end
async_resolve(connection, hostname, scheme)
consume
end
def async_resolve(connection, hostname, scheme)
families = connection.options.ip_families || Resolver.supported_ip_families
log { "resolver: query for #{hostname}" }
timeouts = @timeouts[connection.peer.host]
resolve_timeout = timeouts.first
Thread.start do
Thread.current.report_on_exception = false
begin
addrs = if resolve_timeout
Timeout.timeout(resolve_timeout) do
__addrinfo_resolve(hostname, scheme)
end
else
__addrinfo_resolve(hostname, scheme)
end
addrs = addrs.sort_by(&:afamily).group_by(&:afamily)
families.each do |family|
addresses = addrs[family]
next unless addresses
addresses.map!(&:ip_address)
addresses.uniq!
@pipe_mutex.synchronize do
@ips.unshift([family, connection, addresses])
@pipe_write.putc(DONE) unless @pipe_write.closed?
end
end
rescue StandardError => e
if e.is_a?(Timeout::Error)
timeouts.shift
retry unless timeouts.empty?
e = ResolveTimeoutError.new(resolve_timeout, e.message)
e.set_backtrace(e.backtrace)
end
@pipe_mutex.synchronize do
families.each do |family|
@ips.unshift([family, connection, e])
@pipe_write.putc(ERROR) unless @pipe_write.closed?
end
end
end
end
Thread.pass
end
def close_or_resolve
# drop already closed connections
@connections.shift until @connections.empty? || @connections.first.state != :closed
if (@connections - @queries.map(&:last)).empty?
disconnect
else
resolve
end
end
def __addrinfo_resolve(host, scheme)
Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
end
end
end
|