File: tcp.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (239 lines) | stat: -rw-r--r-- 5,642 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# frozen_string_literal: true

require "resolv"

module HTTPX
  class TCP
    include Loggable

    using URIExtensions

    attr_reader :ip, :port, :addresses, :state, :interests

    alias_method :host, :ip

    def initialize(origin, addresses, options)
      @state = :idle
      @keep_open = false
      @addresses = []
      @ip_index = -1
      @ip = nil
      @hostname = origin.host
      @options = options
      @fallback_protocol = @options.fallback_protocol
      @port = origin.port
      @interests = :w
      if @options.io
        @io = case @options.io
              when Hash
                @options.io[origin.authority]
              else
                @options.io
        end
        raise Error, "Given IO objects do not match the request authority" unless @io

        _, _, _, ip = @io.addr
        @ip = Resolver::Entry.new(ip)
        @addresses << @ip
        @keep_open = true
        @state = :connected
      else
        add_addresses(addresses)
      end
      @ip_index = @addresses.size - 1
    end

    def socket
      @io
    end

    def add_addresses(addrs)
      return if addrs.empty?

      ip_index = @ip_index || (@addresses.size - 1)
      if addrs.first.ipv6?
        # should be the next in line
        @addresses = [*@addresses[0, ip_index], *addrs, *@addresses[ip_index..-1]]
      else
        @addresses.unshift(*addrs)
      end
      @ip_index += addrs.size
    end

    # eliminates expired entries and returns whether there are still any left.
    def addresses?
      prev_addr_size = @addresses.size

      @addresses.delete_if(&:expired?).sort! do |addr1, addr2|
        if addr1.ipv6?
          addr2.ipv6? ? 0 : 1
        else
          addr2.ipv6? ? -1 : 0
        end
      end

      @ip_index = @addresses.size - 1 if prev_addr_size != @addresses.size

      @addresses.any?
    end

    def to_io
      @io.to_io
    end

    def protocol
      @fallback_protocol
    end

    def connect
      return unless closed?

      if @addresses.empty?
        # an idle connection trying to connect with no available addresses is a connection
        # out of the initial context which is back to the DNS resolution loop. This may
        # happen in a fiber-aware context where a connection reconnects with expired addresses,
        # and context is passed back to a fiber on the same connection while waiting for the
        # DNS answer.
        log { "tried connecting while resolving, skipping..." }

        return
      end

      if !@io || @io.closed?
        transition(:idle)
        @io = build_socket
      end
      try_connect
    rescue Errno::EHOSTUNREACH,
           Errno::ENETUNREACH => e
      @ip_index -= 1

      raise e if @ip_index.negative?

      log { "failed connecting to #{@ip} (#{e.message}), evict from cache and trying next..." }
      @options.resolver_cache.evict(@hostname, @ip)

      @io = build_socket
      retry
    rescue Errno::ECONNREFUSED,
           Errno::EADDRNOTAVAIL,
           SocketError,
           IOError => e
      @ip_index -= 1

      raise e if @ip_index.negative?

      log { "failed connecting to #{@ip} (#{e.message}), trying next..." }
      @io = build_socket
      retry
    rescue Errno::ETIMEDOUT => e
      @ip_index -= 1

      raise ConnectTimeoutError.new(@options.timeout[:connect_timeout], e.message) if @ip_index.negative?

      log { "failed connecting to #{@ip} (#{e.message}), trying next..." }

      @io = build_socket
      retry
    end

    def try_connect
      ret = @io.connect_nonblock(Socket.sockaddr_in(@port, @ip.to_s), exception: false)
      log(level: 3, color: :cyan) { "TCP CONNECT: #{ret}..." }
      case ret
      when :wait_readable
        @interests = :r
        return
      when :wait_writable
        @interests = :w
        return
      end
      transition(:connected)
      @interests = :w
    rescue Errno::EALREADY
      @interests = :w
    end
    private :try_connect

    def read(size, buffer)
      ret = @io.read_nonblock(size, buffer, exception: false)
      if ret == :wait_readable
        buffer.clear
        return 0
      end
      return if ret.nil?

      log { "READ: #{buffer.bytesize} bytes..." }
      buffer.bytesize
    end

    def write(buffer)
      siz = @io.write_nonblock(buffer, exception: false)
      return 0 if siz == :wait_writable
      return if siz.nil?

      log { "WRITE: #{siz} bytes..." }

      buffer.shift!(siz)
      siz
    end

    def close
      return if @keep_open || closed?

      begin
        @io.close
      ensure
        transition(:closed)
      end
    end

    def connected?
      @state == :connected
    end

    def closed?
      @state == :idle || @state == :closed
    end

    # :nocov:
    def inspect
      "#<#{self.class}:#{object_id} " \
        "#{@ip}:#{@port} " \
        "@state=#{@state} " \
        "@hostname=#{@hostname} " \
        "@addresses=#{@addresses} " \
        "@state=#{@state}>"
    end
    # :nocov:

    private

    def build_socket
      @ip = @addresses[@ip_index]
      Socket.new(@ip.family, :STREAM, 0)
    end

    def transition(nextstate)
      case nextstate
      # when :idle
      when :connected
        return unless @state == :idle
      when :closed
        return unless @state == :connected
      end
      do_transition(nextstate)
    end

    def do_transition(nextstate)
      log(level: 1) { log_transition_state(nextstate) }
      @state = nextstate
    end

    def log_transition_state(nextstate)
      label = host
      label = "#{label}(##{@io.fileno})" if nextstate == :connected
      "#{label} #{@state} -> #{nextstate}"
    end
  end
end