File: connection.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (965 lines) | stat: -rw-r--r-- 27,950 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
# frozen_string_literal: true

require "resolv"
require "forwardable"
require "httpx/io"
require "httpx/buffer"

module HTTPX
  # The Connection can be watched for IO events.
  #
  # It contains the +io+ object to read/write from, and knows what to do when it can.
  #
  # It defers connecting until absolutely necessary. Connection should be triggered from
  # the IO selector (until then, any request will be queued).
  #
  # A connection boots up its parser after connection is established. All pending requests
  # will be redirected there after connection.
  #
  # A connection can be prevented from closing by the parser, that is, if there are pending
  # requests. This will signal that the connection was prematurely closed, due to a possible
  # number of conditions:
  #
  # * Remote peer closed the connection ("Connection: close");
  # * Remote peer doesn't support pipelining;
  #
  # A connection may also route requests for a different host for which the +io+ was connected
  # to, provided that the IP is the same and the port and scheme as well. This will allow to
  # share the same socket to send HTTP/2 requests to different hosts.
  #
  class Connection
    extend Forwardable
    include Loggable
    include Callbacks

    using URIExtensions

    def_delegator :@write_buffer, :empty?

    attr_reader :type, :io, :origin, :origins, :state, :pending, :options, :ssl_session, :sibling

    attr_writer :current_selector

    attr_accessor :current_session, :family

    protected :ssl_session, :sibling

    def initialize(uri, options)
      @current_session = @current_selector =
        @parser = @sibling = @coalesced_connection = @altsvc_connection =
                               @family = @io = @ssl_session = @timeout =
                                                 @connected_at = @response_received_at = nil

      @exhausted = @cloned = @main_sibling = false

      @options = Options.new(options)
      @type = initialize_type(uri, @options)
      @origins = [uri.origin]
      @origin = Utils.to_uri(uri.origin)
      @window_size = @options.window_size
      @read_buffer = Buffer.new(@options.buffer_size)
      @write_buffer = Buffer.new(@options.buffer_size)
      @pending = []
      @inflight = 0
      @keep_alive_timeout = @options.timeout[:keep_alive_timeout]

      if @options.io
        # if there's an already open IO, get its
        # peer address, and force-initiate the parser
        transition(:already_open)
        @io = build_socket
        parser
      else
        transition(:idle)
      end
      self.addresses = @options.addresses if @options.addresses
    end

    def peer
      @origin
    end

    # this is a semi-private method, to be used by the resolver
    # to initiate the io object.
    def addresses=(addrs)
      if @io
        @io.add_addresses(addrs)
      else
        @io = build_socket(addrs)
      end
    end

    def addresses
      @io && @io.addresses
    end

    def addresses?
      @io && @io.addresses?
    end

    def match?(uri, options)
      return false if !used? && (@state == :closing || @state == :closed)

      @origins.include?(uri.origin) &&
        # if there is more than one origin to match, it means that this connection
        # was the result of coalescing. To prevent blind trust in the case where the
        # origin came from an ORIGIN frame, we're going to verify the hostname with the
        # SSL certificate
        (@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host))) &&
        @options == options
    end

    def mergeable?(connection)
      return false if @state == :closing || @state == :closed || !@io

      return false unless connection.addresses

      (
        (open? && @origin == connection.origin) ||
        !(@io.addresses & (connection.addresses || [])).empty?
      ) && @options == connection.options
    end

    # coalesces +self+ into +connection+.
    def coalesce!(connection)
      @coalesced_connection = connection

      close_sibling
      connection.merge(self)
    end

    def coalesced?
      @coalesced_connection
    end

    # coalescable connections need to be mergeable!
    # but internally, #mergeable? is called before #coalescable?
    def coalescable?(connection)
      if @io.protocol == "h2" &&
         @origin.scheme == "https" &&
         connection.origin.scheme == "https" &&
         @io.can_verify_peer?
        @io.verify_hostname(connection.origin.host)
      else
        @origin == connection.origin
      end
    end

    def merge(connection)
      @origins |= connection.instance_variable_get(:@origins)
      if @ssl_session.nil? && connection.ssl_session
        @ssl_session = connection.ssl_session
        @io.session_new_cb do |sess|
          @ssl_session = sess
        end if @io
      end
      connection.purge_pending do |req|
        send(req)
      end
    end

    def purge_pending(&block)
      pendings = []
      if @parser
        @inflight -= @parser.pending.size
        pendings << @parser.pending
      end
      pendings << @pending
      pendings.each do |pending|
        pending.reject!(&block)
      end
    end

    def io_connected?
      return @coalesced_connection.io_connected? if @coalesced_connection

      @io && @io.state == :connected
    end

    def connecting?
      @state == :idle
    end

    def inflight?
      @parser && (
        # parser may be dealing with other requests (possibly started from a different fiber)
        !@parser.empty? ||
        # connection may be doing connection termination handshake
        !@write_buffer.empty?
      )
    end

    def interests
      # connecting
      if connecting?
        connect

        return @io.interests if connecting?
      end

      return @parser.interests if @parser

      nil
    rescue StandardError => e
      on_error(e)
      nil
    end

    def to_io
      @io.to_io
    end

    def call
      case @state
      when :idle
        connect

        # when opening the tcp or ssl socket fails
        return if @state == :closed

        consume
      when :closed
        return
      when :closing
        consume
        transition(:closed)
      when :open
        consume
      end
      nil
    rescue StandardError => e
      @write_buffer.clear
      on_error(e)
    rescue Exception => e # rubocop:disable Lint/RescueException
      force_close(true)
      raise e
    end

    def close
      transition(:active) if @state == :inactive

      @parser.close if @parser
    end

    def terminate
      case @state
      when :idle
        purge_after_closed
        disconnect
      when :closed
        @connected_at = nil
      end

      close
    end

    # bypasses state machine rules while setting the connection in the
    # :closed state.
    def force_close(delete_pending = false)
      if delete_pending
        @pending.clear
      elsif (parser = @parser)
        enqueue_pending_requests_from_parser(parser)
      end
      return if @state == :closed

      @state = :closed
      @write_buffer.clear
      purge_after_closed
      disconnect
      emit(:force_closed, delete_pending)
    end

    # bypasses the state machine to force closing of connections still connecting.
    # **only** used for Happy Eyeballs v2.
    def force_reset(cloned = false)
      @state = :closing
      @cloned = cloned
      transition(:closed)
    end

    def reset
      return if @state == :closing || @state == :closed

      transition(:closing)

      transition(:closed)
    end

    def send(request)
      return @coalesced_connection.send(request) if @coalesced_connection

      if @parser && !@write_buffer.full?
        if @response_received_at && @keep_alive_timeout &&
           Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
          # when pushing a request into an existing connection, we have to check whether there
          # is the possibility that the connection might have extended the keep alive timeout.
          # for such cases, we want to ping for availability before deciding to shovel requests.
          log(level: 3) { "keep alive timeout expired, pinging connection..." }
          @pending << request
          transition(:active) if @state == :inactive
          parser.ping
          request.ping!
          return
        end

        send_request_to_parser(request)
      else
        @pending << request
      end
    end

    def timeout
      return if @state == :closed || @state == :inactive

      return @timeout if @timeout

      return @options.timeout[:connect_timeout] if @state == :idle

      @options.timeout[:operation_timeout]
    end

    def idling
      purge_after_closed
      @write_buffer.clear
      transition(:idle)
      @parser = nil if @parser
    end

    def used?
      @connected_at
    end

    def deactivate
      transition(:inactive)
    end

    def open?
      @state == :open || @state == :inactive
    end

    def handle_socket_timeout(interval)
      error = OperationTimeoutError.new(interval, "timed out while waiting on select")
      error.set_backtrace(caller)
      on_error(error)
    end

    def sibling=(connection)
      @sibling = connection

      return unless connection

      @main_sibling = connection.sibling.nil?

      return unless @main_sibling

      connection.sibling = self
    end

    def handle_connect_error(error)
      return on_error(error) unless @sibling && @sibling.connecting?

      @sibling.merge(self)

      force_reset(true)
    end

    # disconnects from the current session it's attached to
    def disconnect
      return if @exhausted # it'll reset

      return unless (current_session = @current_session) && (current_selector = @current_selector)

      @current_session = @current_selector = nil

      current_session.deselect_connection(self, current_selector, @cloned)
    end

    def on_error(error, request = nil)
      if error.is_a?(OperationTimeoutError)

        # inactive connections do not contribute to the select loop, therefore
        # they should not fail due to such errors.
        return if @state == :inactive

        if @timeout
          @timeout -= error.timeout
          return unless @timeout <= 0
        end

        error = error.to_connection_error if connecting?
      end
      handle_error(error, request)
      reset
    end

    # :nocov:
    def inspect
      "#<#{self.class}:#{object_id} " \
        "@origin=#{@origin} " \
        "@state=#{@state} " \
        "@pending=#{@pending.size} " \
        "@io=#{@io}>"
    end
    # :nocov:

    private

    def connect
      transition(:open)
    end

    def consume
      return unless @io

      catch(:called) do
        epiped = false
        loop do
          # connection may have
          return if @state == :idle

          parser.consume

          # we exit if there's no more requests to process
          #
          # this condition takes into account:
          #
          # * the number of inflight requests
          # * the number of pending requests
          # * whether the write buffer has bytes (i.e. for close handshake)
          if @pending.empty? && @inflight.zero? && @write_buffer.empty?
            log(level: 3) { "NO MORE REQUESTS..." } if @parser && @parser.pending.any?

            # terminate if an altsvc connection has been established
            terminate if @altsvc_connection

            return
          end

          @timeout = @current_timeout

          read_drained = false
          write_drained = nil

          #
          # tight read loop.
          #
          # read as much of the socket as possible.
          #
          # this tight loop reads all the data it can from the socket and pipes it to
          # its parser.
          #
          loop do
            siz = @io.read(@window_size, @read_buffer)
            log(level: 3, color: :cyan) { "IO READ: #{siz} bytes... (wsize: #{@window_size}, rbuffer: #{@read_buffer.bytesize})" }
            unless siz
              @write_buffer.clear

              ex = EOFError.new("descriptor closed")
              ex.set_backtrace(caller)
              on_error(ex)
              return
            end

            # socket has been drained. mark and exit the read loop.
            if siz.zero?
              read_drained = @read_buffer.empty?
              epiped = false
              break
            end

            parser << @read_buffer.to_s

            # continue reading if possible.
            break if interests == :w && !epiped

            # exit the read loop if connection is preparing to be closed
            break if @state == :closing || @state == :closed

            # exit #consume altogether if all outstanding requests have been dealt with
            if @pending.empty? && @inflight.zero? && @write_buffer.empty? # rubocop:disable Style/Next
              log(level: 3) { "NO MORE REQUESTS..." } if @parser && @parser.pending.any?

              # terminate if an altsvc connection has been established
              terminate if @altsvc_connection

              return
            end
          end unless ((ints = interests).nil? || ints == :w || @state == :closing) && !epiped

          #
          # tight write loop.
          #
          # flush as many bytes as the sockets allow.
          #
          loop do
            # buffer has been drainned, mark and exit the write loop.
            if @write_buffer.empty?
              # we only mark as drained on the first loop
              write_drained = write_drained.nil? && @inflight.positive?

              break
            end

            begin
              siz = @io.write(@write_buffer)
            rescue Errno::EPIPE
              # this can happen if we still have bytes in the buffer to send to the server, but
              # the server wants to respond immediately with some message, or an error. An example is
              # when one's uploading a big file to an unintended endpoint, and the server stops the
              # consumption, and responds immediately with an authorization of even method not allowed error.
              # at this point, we have to let the connection switch to read-mode.
              log(level: 2) { "pipe broken, could not flush buffer..." }
              epiped = true
              read_drained = false
              break
            end
            log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
            unless siz
              @write_buffer.clear

              ex = EOFError.new("descriptor closed")
              ex.set_backtrace(caller)
              on_error(ex)
              return
            end

            # socket closed for writing. mark and exit the write loop.
            if siz.zero?
              write_drained = !@write_buffer.empty?
              break
            end

            # exit write loop if marked to consume from peer, or is closing.
            break if interests == :r || @state == :closing || @state == :closed

            write_drained = false
          end unless (ints = interests) == :r

          send_pending if @state == :open

          # return if socket is drained
          next unless (ints != :r || read_drained) && (ints != :w || write_drained)

          # gotta go back to the event loop. It happens when:
          #
          # * the socket is drained of bytes or it's not the interest of the conn to read;
          # * theres nothing more to write, or it's not in the interest of the conn to write;
          log(level: 3) { "(#{ints}): WAITING FOR EVENTS..." }
          return
        end
      end
    end

    def send_pending
      while !@write_buffer.full? && (request = @pending.shift)
        send_request_to_parser(request)
      end
    end

    def parser
      @parser ||= build_parser
    end

    def send_request_to_parser(request)
      @inflight += 1
      request.peer_address = @io.ip && @io.ip.address
      set_request_timeouts(request)

      parser.send(request)

      return unless @state == :inactive

      transition(:active)
      # mark request as ping, as this inactive connection may have been
      # closed by the server, and we don't want that to influence retry
      # bookkeeping.
      request.ping!
    end

    def enqueue_pending_requests_from_parser(parser)
      parser_pending_requests = parser.pending

      return if parser_pending_requests.empty?

      # the connection will be reused, so parser requests must come
      # back to the pending list before the parser is reset.
      @inflight -= parser_pending_requests.size
      @pending.unshift(*parser_pending_requests)
    end

    def build_parser(protocol = @io.protocol)
      parser = parser_type(protocol).new(@write_buffer, @options)
      set_parser_callbacks(parser)
      parser
    end

    def set_parser_callbacks(parser)
      parser.on(:response) do |request, response|
        AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
          build_altsvc_connection(alt_origin, origin, alt_params)
        end
        @response_received_at = Utils.now
        @inflight -= 1
        response.finish!
        request.emit(:response, response)
      end
      parser.on(:altsvc) do |alt_origin, origin, alt_params|
        build_altsvc_connection(alt_origin, origin, alt_params)
      end

      parser.on(:pong, &method(:send_pending))

      parser.on(:promise) do |request, stream|
        request.emit(:promise, parser, stream)
      end
      parser.on(:exhausted) do
        enqueue_pending_requests_from_parser(parser)

        @exhausted = true
        parser.close

        idling
        @exhausted = false
      end
      parser.on(:origin) do |origin|
        @origins |= [origin]
      end
      parser.on(:close) do
        reset
        disconnect
      end
      parser.on(:close_handshake) do
        consume unless @state == :closed
      end
      parser.on(:reset) do
        enqueue_pending_requests_from_parser(parser)

        reset
        # :reset event only fired in http/1.1, so this guarantees
        # that the connection will be closed here.
        idling unless @pending.empty?
      end
      parser.on(:current_timeout) do
        @current_timeout = @timeout = parser.timeout
      end
      parser.on(:timeout) do |tout|
        @timeout = tout
      end
      parser.on(:error) do |request, error|
        case error
        when :http_1_1_required
          current_session = @current_session
          current_selector = @current_selector
          parser.close

          other_connection = current_session.find_connection(@origin, current_selector,
                                                             @options.merge(ssl: { alpn_protocols: %w[http/1.1] }))
          other_connection.merge(self)
          request.transition(:idle)
          other_connection.send(request)
          next
        when OperationTimeoutError
          # request level timeouts should take precedence
          next unless request.active_timeouts.empty?
        end

        @inflight -= 1
        response = ErrorResponse.new(request, error)
        request.response = response
        request.emit(:response, response)
      end
    end

    def transition(nextstate)
      handle_transition(nextstate)
    rescue Errno::ECONNABORTED,
           Errno::ECONNREFUSED,
           Errno::ECONNRESET,
           Errno::EADDRNOTAVAIL,
           Errno::EHOSTUNREACH,
           Errno::EINVAL,
           Errno::ENETUNREACH,
           Errno::EPIPE,
           Errno::ENOENT,
           SocketError,
           IOError => e
      # connect errors, exit gracefully
      error = ConnectionError.new(e.message)
      error.set_backtrace(e.backtrace)
      handle_connect_error(error) if connecting?
      force_close
    rescue TLSError, ::HTTP2::Error::ProtocolError, ::HTTP2::Error::HandshakeError => e
      # connect errors, exit gracefully
      handle_error(e)
      handle_connect_error(e) if connecting?
      force_close
    end

    def handle_transition(nextstate)
      case nextstate
      when :idle
        @timeout = @current_timeout = @options.timeout[:connect_timeout]

        @connected_at = @response_received_at = nil
      when :open
        return if @state == :closed

        @io.connect
        close_sibling if @io.state == :connected

        return unless @io.connected?

        @connected_at = Utils.now

        send_pending

        @timeout = @current_timeout = parser.timeout
        emit(:open)
      when :inactive
        return unless @state == :open

        # do not deactivate connection in use
        return if @inflight.positive? || @parser.waiting_for_ping?

        disconnect
      when :closing
        return unless connecting? || @state == :open

        unless @write_buffer.empty?
          # preset state before handshake, as error callbacks
          # may take it back here.
          @state = nextstate
          # handshakes, try sending
          consume
          @write_buffer.clear
          return
        end
      when :closed
        return unless @state == :closing
        return unless @write_buffer.empty?

        purge_after_closed
        disconnect if @pending.empty?

      when :already_open
        nextstate = :open
        # the first check for given io readiness must still use a timeout.
        # connect is the reasonable choice in such a case.
        @timeout = @options.timeout[:connect_timeout]
        send_pending
      when :active
        return unless @state == :inactive

        nextstate = :open

        # activate
        @current_session.select_connection(self, @current_selector)
      end
      log(level: 3) { "#{@state} -> #{nextstate}" }
      @state = nextstate
    end

    def close_sibling
      return unless @sibling

      if @sibling.io_connected?
        reset
        # TODO: transition connection to closed
      end

      unless @sibling.state == :closed
        merge(@sibling) unless @main_sibling
        @sibling.force_reset(true)
      end

      @sibling = nil
    end

    def purge_after_closed
      @io.close if @io
      @read_buffer.clear
      @timeout = nil
    end

    def initialize_type(uri, options)
      options.transport || begin
        case uri.scheme
        when "http"
          "tcp"
        when "https"
          "ssl"
        else
          raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
        end
      end
    end

    # returns an HTTPX::Connection for the negotiated Alternative Service (or none).
    def build_altsvc_connection(alt_origin, origin, alt_params)
      return if @altsvc_connection

      # do not allow security downgrades on altsvc negotiation
      return if @origin.scheme == "https" && alt_origin.scheme != "https"

      altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))

      # altsvc already exists, somehow it wasn't advertised, probably noop
      return unless altsvc

      alt_options = @options.merge(ssl: @options.ssl.merge(hostname: URI(origin).host))

      connection = @current_session.find_connection(alt_origin, @current_selector, alt_options)

      # advertised altsvc is the same origin being used, ignore
      return if connection == self

      connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin)

      @altsvc_connection = connection

      log(level: 1) { "#{origin}: alt-svc connection##{connection.object_id} established to #{alt_origin}" }

      connection.merge(self)
    rescue UnsupportedSchemeError
      altsvc["noop"] = true
      nil
    end

    def build_socket(addrs = nil)
      case @type
      when "tcp"
        TCP.new(peer, addrs, @options)
      when "ssl"
        SSL.new(peer, addrs, @options) do |sock|
          sock.ssl_session = @ssl_session
          sock.session_new_cb do |sess|
            @ssl_session = sess

            sock.ssl_session = sess
          end
        end
      when "unix"
        path = Array(addrs).first

        path = String(path) if path

        UNIX.new(peer, path, @options)
      else
        raise Error, "unsupported transport (#{@type})"
      end
    end

    # recover internal state and emit all relevant error responses when +error+ was raised.
    # this takes an optiona +request+ which may have already been handled and can be opted out
    # in the state recovery process.
    def handle_error(error, request = nil)
      parser.handle_error(error, request) if @parser && @parser.respond_to?(:handle_error)
      while (req = @pending.shift)
        next if request && req == request

        response = ErrorResponse.new(req, error)
        req.response = response
        req.emit(:response, response)
      end

      return unless request

      @inflight -= 1
      response = ErrorResponse.new(request, error)
      request.response = response
      request.emit(:response, response)
    end

    def set_request_timeouts(request)
      set_request_write_timeout(request)
      set_request_read_timeout(request)
      set_request_request_timeout(request)
    end

    def set_request_read_timeout(request)
      read_timeout = request.read_timeout

      return if read_timeout.nil? || read_timeout.infinite?

      set_request_timeout(:read_timeout, request, read_timeout, :done, :response) do
        read_timeout_callback(request, read_timeout)
      end
    end

    def set_request_write_timeout(request)
      write_timeout = request.write_timeout

      return if write_timeout.nil? || write_timeout.infinite?

      set_request_timeout(:write_timeout, request, write_timeout, :headers, %i[done response]) do
        write_timeout_callback(request, write_timeout)
      end
    end

    def set_request_request_timeout(request)
      request_timeout = request.request_timeout

      return if request_timeout.nil? || request_timeout.infinite?

      set_request_timeout(:request_timeout, request, request_timeout, :headers, :complete) do
        read_timeout_callback(request, request_timeout, RequestTimeoutError)
      end
    end

    def write_timeout_callback(request, write_timeout)
      return if request.state == :done

      @write_buffer.clear
      error = WriteTimeoutError.new(request, nil, write_timeout)

      on_error(error, request)
    end

    def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
      response = request.response

      return if response && response.finished?

      @write_buffer.clear
      error = error_type.new(request, request.response, read_timeout)

      on_error(error, request)
    end

    def set_request_timeout(label, request, timeout, start_event, finish_events, &callback)
      request.set_timeout_callback(start_event) do
        unless @current_selector
          raise Error, "request has been resend to an out-of-session connection, and this " \
                       "should never happen!!! Please report this error! " \
                       "(state:#{@state}, " \
                       "parser?:#{!!@parser}, " \
                       "bytes in write buffer?:#{!@write_buffer.empty?}, " \
                       "cloned?:#{@cloned}, " \
                       "sibling?:#{!!@sibling}, " \
                       "coalesced?:#{coalesced?})"
        end

        timer = @current_selector.after(timeout, callback)
        request.active_timeouts << label

        Array(finish_events).each do |event|
          # clean up request timeouts if the connection errors out
          request.set_timeout_callback(event) do
            timer.cancel
            request.active_timeouts.delete(label)
          end
        end
      end
    end

    def parser_type(protocol)
      case protocol
      when "h2" then @options.http2_class
      when "http/1.1" then @options.http1_class
      else
        raise Error, "unsupported protocol (##{protocol})"
      end
    end
  end
end