1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965
|
# frozen_string_literal: true
require "resolv"
require "forwardable"
require "httpx/io"
require "httpx/buffer"
module HTTPX
# The Connection can be watched for IO events.
#
# It contains the +io+ object to read/write from, and knows what to do when it can.
#
# It defers connecting until absolutely necessary. Connection should be triggered from
# the IO selector (until then, any request will be queued).
#
# A connection boots up its parser after connection is established. All pending requests
# will be redirected there after connection.
#
# A connection can be prevented from closing by the parser, that is, if there are pending
# requests. This will signal that the connection was prematurely closed, due to a possible
# number of conditions:
#
# * Remote peer closed the connection ("Connection: close");
# * Remote peer doesn't support pipelining;
#
# A connection may also route requests for a different host for which the +io+ was connected
# to, provided that the IP is the same and the port and scheme as well. This will allow to
# share the same socket to send HTTP/2 requests to different hosts.
#
class Connection
extend Forwardable
include Loggable
include Callbacks
using URIExtensions
def_delegator :@write_buffer, :empty?
attr_reader :type, :io, :origin, :origins, :state, :pending, :options, :ssl_session, :sibling
attr_writer :current_selector
attr_accessor :current_session, :family
protected :ssl_session, :sibling
def initialize(uri, options)
@current_session = @current_selector =
@parser = @sibling = @coalesced_connection = @altsvc_connection =
@family = @io = @ssl_session = @timeout =
@connected_at = @response_received_at = nil
@exhausted = @cloned = @main_sibling = false
@options = Options.new(options)
@type = initialize_type(uri, @options)
@origins = [uri.origin]
@origin = Utils.to_uri(uri.origin)
@window_size = @options.window_size
@read_buffer = Buffer.new(@options.buffer_size)
@write_buffer = Buffer.new(@options.buffer_size)
@pending = []
@inflight = 0
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
if @options.io
# if there's an already open IO, get its
# peer address, and force-initiate the parser
transition(:already_open)
@io = build_socket
parser
else
transition(:idle)
end
self.addresses = @options.addresses if @options.addresses
end
def peer
@origin
end
# this is a semi-private method, to be used by the resolver
# to initiate the io object.
def addresses=(addrs)
if @io
@io.add_addresses(addrs)
else
@io = build_socket(addrs)
end
end
def addresses
@io && @io.addresses
end
def addresses?
@io && @io.addresses?
end
def match?(uri, options)
return false if !used? && (@state == :closing || @state == :closed)
@origins.include?(uri.origin) &&
# if there is more than one origin to match, it means that this connection
# was the result of coalescing. To prevent blind trust in the case where the
# origin came from an ORIGIN frame, we're going to verify the hostname with the
# SSL certificate
(@origins.size == 1 || @origin == uri.origin || (@io.is_a?(SSL) && @io.verify_hostname(uri.host))) &&
@options == options
end
def mergeable?(connection)
return false if @state == :closing || @state == :closed || !@io
return false unless connection.addresses
(
(open? && @origin == connection.origin) ||
!(@io.addresses & (connection.addresses || [])).empty?
) && @options == connection.options
end
# coalesces +self+ into +connection+.
def coalesce!(connection)
@coalesced_connection = connection
close_sibling
connection.merge(self)
end
def coalesced?
@coalesced_connection
end
# coalescable connections need to be mergeable!
# but internally, #mergeable? is called before #coalescable?
def coalescable?(connection)
if @io.protocol == "h2" &&
@origin.scheme == "https" &&
connection.origin.scheme == "https" &&
@io.can_verify_peer?
@io.verify_hostname(connection.origin.host)
else
@origin == connection.origin
end
end
def merge(connection)
@origins |= connection.instance_variable_get(:@origins)
if @ssl_session.nil? && connection.ssl_session
@ssl_session = connection.ssl_session
@io.session_new_cb do |sess|
@ssl_session = sess
end if @io
end
connection.purge_pending do |req|
send(req)
end
end
def purge_pending(&block)
pendings = []
if @parser
@inflight -= @parser.pending.size
pendings << @parser.pending
end
pendings << @pending
pendings.each do |pending|
pending.reject!(&block)
end
end
def io_connected?
return @coalesced_connection.io_connected? if @coalesced_connection
@io && @io.state == :connected
end
def connecting?
@state == :idle
end
def inflight?
@parser && (
# parser may be dealing with other requests (possibly started from a different fiber)
!@parser.empty? ||
# connection may be doing connection termination handshake
!@write_buffer.empty?
)
end
def interests
# connecting
if connecting?
connect
return @io.interests if connecting?
end
return @parser.interests if @parser
nil
rescue StandardError => e
on_error(e)
nil
end
def to_io
@io.to_io
end
def call
case @state
when :idle
connect
# when opening the tcp or ssl socket fails
return if @state == :closed
consume
when :closed
return
when :closing
consume
transition(:closed)
when :open
consume
end
nil
rescue StandardError => e
@write_buffer.clear
on_error(e)
rescue Exception => e # rubocop:disable Lint/RescueException
force_close(true)
raise e
end
def close
transition(:active) if @state == :inactive
@parser.close if @parser
end
def terminate
case @state
when :idle
purge_after_closed
disconnect
when :closed
@connected_at = nil
end
close
end
# bypasses state machine rules while setting the connection in the
# :closed state.
def force_close(delete_pending = false)
if delete_pending
@pending.clear
elsif (parser = @parser)
enqueue_pending_requests_from_parser(parser)
end
return if @state == :closed
@state = :closed
@write_buffer.clear
purge_after_closed
disconnect
emit(:force_closed, delete_pending)
end
# bypasses the state machine to force closing of connections still connecting.
# **only** used for Happy Eyeballs v2.
def force_reset(cloned = false)
@state = :closing
@cloned = cloned
transition(:closed)
end
def reset
return if @state == :closing || @state == :closed
transition(:closing)
transition(:closed)
end
def send(request)
return @coalesced_connection.send(request) if @coalesced_connection
if @parser && !@write_buffer.full?
if @response_received_at && @keep_alive_timeout &&
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
# when pushing a request into an existing connection, we have to check whether there
# is the possibility that the connection might have extended the keep alive timeout.
# for such cases, we want to ping for availability before deciding to shovel requests.
log(level: 3) { "keep alive timeout expired, pinging connection..." }
@pending << request
transition(:active) if @state == :inactive
parser.ping
request.ping!
return
end
send_request_to_parser(request)
else
@pending << request
end
end
def timeout
return if @state == :closed || @state == :inactive
return @timeout if @timeout
return @options.timeout[:connect_timeout] if @state == :idle
@options.timeout[:operation_timeout]
end
def idling
purge_after_closed
@write_buffer.clear
transition(:idle)
@parser = nil if @parser
end
def used?
@connected_at
end
def deactivate
transition(:inactive)
end
def open?
@state == :open || @state == :inactive
end
def handle_socket_timeout(interval)
error = OperationTimeoutError.new(interval, "timed out while waiting on select")
error.set_backtrace(caller)
on_error(error)
end
def sibling=(connection)
@sibling = connection
return unless connection
@main_sibling = connection.sibling.nil?
return unless @main_sibling
connection.sibling = self
end
def handle_connect_error(error)
return on_error(error) unless @sibling && @sibling.connecting?
@sibling.merge(self)
force_reset(true)
end
# disconnects from the current session it's attached to
def disconnect
return if @exhausted # it'll reset
return unless (current_session = @current_session) && (current_selector = @current_selector)
@current_session = @current_selector = nil
current_session.deselect_connection(self, current_selector, @cloned)
end
def on_error(error, request = nil)
if error.is_a?(OperationTimeoutError)
# inactive connections do not contribute to the select loop, therefore
# they should not fail due to such errors.
return if @state == :inactive
if @timeout
@timeout -= error.timeout
return unless @timeout <= 0
end
error = error.to_connection_error if connecting?
end
handle_error(error, request)
reset
end
# :nocov:
def inspect
"#<#{self.class}:#{object_id} " \
"@origin=#{@origin} " \
"@state=#{@state} " \
"@pending=#{@pending.size} " \
"@io=#{@io}>"
end
# :nocov:
private
def connect
transition(:open)
end
def consume
return unless @io
catch(:called) do
epiped = false
loop do
# connection may have
return if @state == :idle
parser.consume
# we exit if there's no more requests to process
#
# this condition takes into account:
#
# * the number of inflight requests
# * the number of pending requests
# * whether the write buffer has bytes (i.e. for close handshake)
if @pending.empty? && @inflight.zero? && @write_buffer.empty?
log(level: 3) { "NO MORE REQUESTS..." } if @parser && @parser.pending.any?
# terminate if an altsvc connection has been established
terminate if @altsvc_connection
return
end
@timeout = @current_timeout
read_drained = false
write_drained = nil
#
# tight read loop.
#
# read as much of the socket as possible.
#
# this tight loop reads all the data it can from the socket and pipes it to
# its parser.
#
loop do
siz = @io.read(@window_size, @read_buffer)
log(level: 3, color: :cyan) { "IO READ: #{siz} bytes... (wsize: #{@window_size}, rbuffer: #{@read_buffer.bytesize})" }
unless siz
@write_buffer.clear
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
on_error(ex)
return
end
# socket has been drained. mark and exit the read loop.
if siz.zero?
read_drained = @read_buffer.empty?
epiped = false
break
end
parser << @read_buffer.to_s
# continue reading if possible.
break if interests == :w && !epiped
# exit the read loop if connection is preparing to be closed
break if @state == :closing || @state == :closed
# exit #consume altogether if all outstanding requests have been dealt with
if @pending.empty? && @inflight.zero? && @write_buffer.empty? # rubocop:disable Style/Next
log(level: 3) { "NO MORE REQUESTS..." } if @parser && @parser.pending.any?
# terminate if an altsvc connection has been established
terminate if @altsvc_connection
return
end
end unless ((ints = interests).nil? || ints == :w || @state == :closing) && !epiped
#
# tight write loop.
#
# flush as many bytes as the sockets allow.
#
loop do
# buffer has been drainned, mark and exit the write loop.
if @write_buffer.empty?
# we only mark as drained on the first loop
write_drained = write_drained.nil? && @inflight.positive?
break
end
begin
siz = @io.write(@write_buffer)
rescue Errno::EPIPE
# this can happen if we still have bytes in the buffer to send to the server, but
# the server wants to respond immediately with some message, or an error. An example is
# when one's uploading a big file to an unintended endpoint, and the server stops the
# consumption, and responds immediately with an authorization of even method not allowed error.
# at this point, we have to let the connection switch to read-mode.
log(level: 2) { "pipe broken, could not flush buffer..." }
epiped = true
read_drained = false
break
end
log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
unless siz
@write_buffer.clear
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
on_error(ex)
return
end
# socket closed for writing. mark and exit the write loop.
if siz.zero?
write_drained = !@write_buffer.empty?
break
end
# exit write loop if marked to consume from peer, or is closing.
break if interests == :r || @state == :closing || @state == :closed
write_drained = false
end unless (ints = interests) == :r
send_pending if @state == :open
# return if socket is drained
next unless (ints != :r || read_drained) && (ints != :w || write_drained)
# gotta go back to the event loop. It happens when:
#
# * the socket is drained of bytes or it's not the interest of the conn to read;
# * theres nothing more to write, or it's not in the interest of the conn to write;
log(level: 3) { "(#{ints}): WAITING FOR EVENTS..." }
return
end
end
end
def send_pending
while !@write_buffer.full? && (request = @pending.shift)
send_request_to_parser(request)
end
end
def parser
@parser ||= build_parser
end
def send_request_to_parser(request)
@inflight += 1
request.peer_address = @io.ip && @io.ip.address
set_request_timeouts(request)
parser.send(request)
return unless @state == :inactive
transition(:active)
# mark request as ping, as this inactive connection may have been
# closed by the server, and we don't want that to influence retry
# bookkeeping.
request.ping!
end
def enqueue_pending_requests_from_parser(parser)
parser_pending_requests = parser.pending
return if parser_pending_requests.empty?
# the connection will be reused, so parser requests must come
# back to the pending list before the parser is reset.
@inflight -= parser_pending_requests.size
@pending.unshift(*parser_pending_requests)
end
def build_parser(protocol = @io.protocol)
parser = parser_type(protocol).new(@write_buffer, @options)
set_parser_callbacks(parser)
parser
end
def set_parser_callbacks(parser)
parser.on(:response) do |request, response|
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
build_altsvc_connection(alt_origin, origin, alt_params)
end
@response_received_at = Utils.now
@inflight -= 1
response.finish!
request.emit(:response, response)
end
parser.on(:altsvc) do |alt_origin, origin, alt_params|
build_altsvc_connection(alt_origin, origin, alt_params)
end
parser.on(:pong, &method(:send_pending))
parser.on(:promise) do |request, stream|
request.emit(:promise, parser, stream)
end
parser.on(:exhausted) do
enqueue_pending_requests_from_parser(parser)
@exhausted = true
parser.close
idling
@exhausted = false
end
parser.on(:origin) do |origin|
@origins |= [origin]
end
parser.on(:close) do
reset
disconnect
end
parser.on(:close_handshake) do
consume unless @state == :closed
end
parser.on(:reset) do
enqueue_pending_requests_from_parser(parser)
reset
# :reset event only fired in http/1.1, so this guarantees
# that the connection will be closed here.
idling unless @pending.empty?
end
parser.on(:current_timeout) do
@current_timeout = @timeout = parser.timeout
end
parser.on(:timeout) do |tout|
@timeout = tout
end
parser.on(:error) do |request, error|
case error
when :http_1_1_required
current_session = @current_session
current_selector = @current_selector
parser.close
other_connection = current_session.find_connection(@origin, current_selector,
@options.merge(ssl: { alpn_protocols: %w[http/1.1] }))
other_connection.merge(self)
request.transition(:idle)
other_connection.send(request)
next
when OperationTimeoutError
# request level timeouts should take precedence
next unless request.active_timeouts.empty?
end
@inflight -= 1
response = ErrorResponse.new(request, error)
request.response = response
request.emit(:response, response)
end
end
def transition(nextstate)
handle_transition(nextstate)
rescue Errno::ECONNABORTED,
Errno::ECONNREFUSED,
Errno::ECONNRESET,
Errno::EADDRNOTAVAIL,
Errno::EHOSTUNREACH,
Errno::EINVAL,
Errno::ENETUNREACH,
Errno::EPIPE,
Errno::ENOENT,
SocketError,
IOError => e
# connect errors, exit gracefully
error = ConnectionError.new(e.message)
error.set_backtrace(e.backtrace)
handle_connect_error(error) if connecting?
force_close
rescue TLSError, ::HTTP2::Error::ProtocolError, ::HTTP2::Error::HandshakeError => e
# connect errors, exit gracefully
handle_error(e)
handle_connect_error(e) if connecting?
force_close
end
def handle_transition(nextstate)
case nextstate
when :idle
@timeout = @current_timeout = @options.timeout[:connect_timeout]
@connected_at = @response_received_at = nil
when :open
return if @state == :closed
@io.connect
close_sibling if @io.state == :connected
return unless @io.connected?
@connected_at = Utils.now
send_pending
@timeout = @current_timeout = parser.timeout
emit(:open)
when :inactive
return unless @state == :open
# do not deactivate connection in use
return if @inflight.positive? || @parser.waiting_for_ping?
disconnect
when :closing
return unless connecting? || @state == :open
unless @write_buffer.empty?
# preset state before handshake, as error callbacks
# may take it back here.
@state = nextstate
# handshakes, try sending
consume
@write_buffer.clear
return
end
when :closed
return unless @state == :closing
return unless @write_buffer.empty?
purge_after_closed
disconnect if @pending.empty?
when :already_open
nextstate = :open
# the first check for given io readiness must still use a timeout.
# connect is the reasonable choice in such a case.
@timeout = @options.timeout[:connect_timeout]
send_pending
when :active
return unless @state == :inactive
nextstate = :open
# activate
@current_session.select_connection(self, @current_selector)
end
log(level: 3) { "#{@state} -> #{nextstate}" }
@state = nextstate
end
def close_sibling
return unless @sibling
if @sibling.io_connected?
reset
# TODO: transition connection to closed
end
unless @sibling.state == :closed
merge(@sibling) unless @main_sibling
@sibling.force_reset(true)
end
@sibling = nil
end
def purge_after_closed
@io.close if @io
@read_buffer.clear
@timeout = nil
end
def initialize_type(uri, options)
options.transport || begin
case uri.scheme
when "http"
"tcp"
when "https"
"ssl"
else
raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
end
end
end
# returns an HTTPX::Connection for the negotiated Alternative Service (or none).
def build_altsvc_connection(alt_origin, origin, alt_params)
return if @altsvc_connection
# do not allow security downgrades on altsvc negotiation
return if @origin.scheme == "https" && alt_origin.scheme != "https"
altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))
# altsvc already exists, somehow it wasn't advertised, probably noop
return unless altsvc
alt_options = @options.merge(ssl: @options.ssl.merge(hostname: URI(origin).host))
connection = @current_session.find_connection(alt_origin, @current_selector, alt_options)
# advertised altsvc is the same origin being used, ignore
return if connection == self
connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin)
@altsvc_connection = connection
log(level: 1) { "#{origin}: alt-svc connection##{connection.object_id} established to #{alt_origin}" }
connection.merge(self)
rescue UnsupportedSchemeError
altsvc["noop"] = true
nil
end
def build_socket(addrs = nil)
case @type
when "tcp"
TCP.new(peer, addrs, @options)
when "ssl"
SSL.new(peer, addrs, @options) do |sock|
sock.ssl_session = @ssl_session
sock.session_new_cb do |sess|
@ssl_session = sess
sock.ssl_session = sess
end
end
when "unix"
path = Array(addrs).first
path = String(path) if path
UNIX.new(peer, path, @options)
else
raise Error, "unsupported transport (#{@type})"
end
end
# recover internal state and emit all relevant error responses when +error+ was raised.
# this takes an optiona +request+ which may have already been handled and can be opted out
# in the state recovery process.
def handle_error(error, request = nil)
parser.handle_error(error, request) if @parser && @parser.respond_to?(:handle_error)
while (req = @pending.shift)
next if request && req == request
response = ErrorResponse.new(req, error)
req.response = response
req.emit(:response, response)
end
return unless request
@inflight -= 1
response = ErrorResponse.new(request, error)
request.response = response
request.emit(:response, response)
end
def set_request_timeouts(request)
set_request_write_timeout(request)
set_request_read_timeout(request)
set_request_request_timeout(request)
end
def set_request_read_timeout(request)
read_timeout = request.read_timeout
return if read_timeout.nil? || read_timeout.infinite?
set_request_timeout(:read_timeout, request, read_timeout, :done, :response) do
read_timeout_callback(request, read_timeout)
end
end
def set_request_write_timeout(request)
write_timeout = request.write_timeout
return if write_timeout.nil? || write_timeout.infinite?
set_request_timeout(:write_timeout, request, write_timeout, :headers, %i[done response]) do
write_timeout_callback(request, write_timeout)
end
end
def set_request_request_timeout(request)
request_timeout = request.request_timeout
return if request_timeout.nil? || request_timeout.infinite?
set_request_timeout(:request_timeout, request, request_timeout, :headers, :complete) do
read_timeout_callback(request, request_timeout, RequestTimeoutError)
end
end
def write_timeout_callback(request, write_timeout)
return if request.state == :done
@write_buffer.clear
error = WriteTimeoutError.new(request, nil, write_timeout)
on_error(error, request)
end
def read_timeout_callback(request, read_timeout, error_type = ReadTimeoutError)
response = request.response
return if response && response.finished?
@write_buffer.clear
error = error_type.new(request, request.response, read_timeout)
on_error(error, request)
end
def set_request_timeout(label, request, timeout, start_event, finish_events, &callback)
request.set_timeout_callback(start_event) do
unless @current_selector
raise Error, "request has been resend to an out-of-session connection, and this " \
"should never happen!!! Please report this error! " \
"(state:#{@state}, " \
"parser?:#{!!@parser}, " \
"bytes in write buffer?:#{!@write_buffer.empty?}, " \
"cloned?:#{@cloned}, " \
"sibling?:#{!!@sibling}, " \
"coalesced?:#{coalesced?})"
end
timer = @current_selector.after(timeout, callback)
request.active_timeouts << label
Array(finish_events).each do |event|
# clean up request timeouts if the connection errors out
request.set_timeout_callback(event) do
timer.cancel
request.active_timeouts.delete(label)
end
end
end
end
def parser_type(protocol)
case protocol
when "h2" then @options.http2_class
when "http/1.1" then @options.http1_class
else
raise Error, "unsupported protocol (##{protocol})"
end
end
end
end
|