1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
|
# frozen_string_literal: true
require "resolv"
module HTTPX
class TCP
include Loggable
using URIExtensions
attr_reader :ip, :port, :addresses, :state, :interests
alias_method :host, :ip
def initialize(origin, addresses, options)
@state = :idle
@keep_open = false
@addresses = []
@ip_index = -1
@ip = nil
@hostname = origin.host
@options = options
@fallback_protocol = @options.fallback_protocol
@port = origin.port
@interests = :w
if @options.io
@io = case @options.io
when Hash
@options.io[origin.authority]
else
@options.io
end
raise Error, "Given IO objects do not match the request authority" unless @io
_, _, _, ip = @io.addr
@ip = Resolver::Entry.new(ip)
@addresses << @ip
@keep_open = true
@state = :connected
else
add_addresses(addresses)
end
@ip_index = @addresses.size - 1
end
def socket
@io
end
def add_addresses(addrs)
return if addrs.empty?
ip_index = @ip_index || (@addresses.size - 1)
if addrs.first.ipv6?
# should be the next in line
@addresses = [*@addresses[0, ip_index], *addrs, *@addresses[ip_index..-1]]
else
@addresses.unshift(*addrs)
end
@ip_index += addrs.size
end
# eliminates expired entries and returns whether there are still any left.
def addresses?
prev_addr_size = @addresses.size
@addresses.delete_if(&:expired?).sort! do |addr1, addr2|
if addr1.ipv6?
addr2.ipv6? ? 0 : 1
else
addr2.ipv6? ? -1 : 0
end
end
@ip_index = @addresses.size - 1 if prev_addr_size != @addresses.size
@addresses.any?
end
def to_io
@io.to_io
end
def protocol
@fallback_protocol
end
def connect
return unless closed?
if @addresses.empty?
# an idle connection trying to connect with no available addresses is a connection
# out of the initial context which is back to the DNS resolution loop. This may
# happen in a fiber-aware context where a connection reconnects with expired addresses,
# and context is passed back to a fiber on the same connection while waiting for the
# DNS answer.
log { "tried connecting while resolving, skipping..." }
return
end
if !@io || @io.closed?
transition(:idle)
@io = build_socket
end
try_connect
rescue Errno::EHOSTUNREACH,
Errno::ENETUNREACH => e
@ip_index -= 1
raise e if @ip_index.negative?
log { "failed connecting to #{@ip} (#{e.message}), evict from cache and trying next..." }
@options.resolver_cache.evict(@hostname, @ip)
@io = build_socket
retry
rescue Errno::ECONNREFUSED,
Errno::EADDRNOTAVAIL,
SocketError,
IOError => e
@ip_index -= 1
raise e if @ip_index.negative?
log { "failed connecting to #{@ip} (#{e.message}), trying next..." }
@io = build_socket
retry
rescue Errno::ETIMEDOUT => e
@ip_index -= 1
raise ConnectTimeoutError.new(@options.timeout[:connect_timeout], e.message) if @ip_index.negative?
log { "failed connecting to #{@ip} (#{e.message}), trying next..." }
@io = build_socket
retry
end
def try_connect
ret = @io.connect_nonblock(Socket.sockaddr_in(@port, @ip.to_s), exception: false)
log(level: 3, color: :cyan) { "TCP CONNECT: #{ret}..." }
case ret
when :wait_readable
@interests = :r
return
when :wait_writable
@interests = :w
return
end
transition(:connected)
@interests = :w
rescue Errno::EALREADY
@interests = :w
end
private :try_connect
def read(size, buffer)
ret = @io.read_nonblock(size, buffer, exception: false)
if ret == :wait_readable
buffer.clear
return 0
end
return if ret.nil?
log { "READ: #{buffer.bytesize} bytes..." }
buffer.bytesize
end
def write(buffer)
siz = @io.write_nonblock(buffer, exception: false)
return 0 if siz == :wait_writable
return if siz.nil?
log { "WRITE: #{siz} bytes..." }
buffer.shift!(siz)
siz
end
def close
return if @keep_open || closed?
begin
@io.close
ensure
transition(:closed)
end
end
def connected?
@state == :connected
end
def closed?
@state == :idle || @state == :closed
end
# :nocov:
def inspect
"#<#{self.class}:#{object_id} " \
"#{@ip}:#{@port} " \
"@state=#{@state} " \
"@hostname=#{@hostname} " \
"@addresses=#{@addresses} " \
"@state=#{@state}>"
end
# :nocov:
private
def build_socket
@ip = @addresses[@ip_index]
Socket.new(@ip.family, :STREAM, 0)
end
def transition(nextstate)
case nextstate
# when :idle
when :connected
return unless @state == :idle
when :closed
return unless @state == :connected
end
do_transition(nextstate)
end
def do_transition(nextstate)
log(level: 1) { log_transition_state(nextstate) }
@state = nextstate
end
def log_transition_state(nextstate)
label = host
label = "#{label}(##{@io.fileno})" if nextstate == :connected
"#{label} #{@state} -> #{nextstate}"
end
end
end
|