1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
# frozen_string_literal: true
#
# Copyright, 2017, by Samuel G. D. Williams. <http://www.codeotaku.com>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require 'async/io/endpoint'
require 'async/io/stream'
require 'async/pool/controller'
require 'protocol/http/body/completable'
require 'protocol/http/methods'
require 'traces/provider'
require_relative 'protocol'
module Async
module HTTP
DEFAULT_RETRIES = 3
DEFAULT_CONNECTION_LIMIT = nil
class Client < ::Protocol::HTTP::Methods
# Provides a robust interface to a server.
# * If there are no connections, it will create one.
# * If there are already connections, it will reuse it.
# * If a request fails, it will retry it up to N times if it was idempotent.
# The client object will never become unusable. It internally manages persistent connections (or non-persistent connections if that's required).
# @param endpoint [Endpoint] the endpoint to connnect to.
# @param protocol [Protocol::HTTP1 | Protocol::HTTP2 | Protocol::HTTPS] the protocol to use.
# @param scheme [String] The default scheme to set to requests.
# @param authority [String] The default authority to set to requests.
def initialize(endpoint, protocol: endpoint.protocol, scheme: endpoint.scheme, authority: endpoint.authority, retries: DEFAULT_RETRIES, connection_limit: DEFAULT_CONNECTION_LIMIT)
@endpoint = endpoint
@protocol = protocol
@retries = retries
@pool = make_pool(connection_limit)
@scheme = scheme
@authority = authority
end
attr :endpoint
attr :protocol
attr :retries
attr :pool
attr :scheme
attr :authority
def secure?
@endpoint.secure?
end
def self.open(*arguments, **options, &block)
client = self.new(*arguments, **options)
return client unless block_given?
begin
yield client
ensure
client.close
end
end
def close
while @pool.busy?
Console.logger.warn(self) {"Waiting for #{@protocol} pool to drain: #{@pool}"}
@pool.wait
end
@pool.close
end
def call(request)
request.scheme ||= self.scheme
request.authority ||= self.authority
attempt = 0
# We may retry the request if it is possible to do so. https://tools.ietf.org/html/draft-nottingham-httpbis-retry-01 is a good guide for how retrying requests should work.
begin
attempt += 1
# As we cache pool, it's possible these pool go bad (e.g. closed by remote host). In this case, we need to try again. It's up to the caller to impose a timeout on this. If this is the last attempt, we force a new connection.
connection = @pool.acquire
response = make_response(request, connection)
# This signals that the ensure block below should not try to release the connection, because it's bound into the response which will be returned:
connection = nil
return response
rescue Protocol::RequestFailed
# This is a specific case where the entire request wasn't sent before a failure occurred. So, we can even resend non-idempotent requests.
if connection
@pool.release(connection)
connection = nil
end
if attempt < @retries
retry
else
raise
end
rescue SocketError, IOError, EOFError, Errno::ECONNRESET, Errno::EPIPE
if connection
@pool.release(connection)
connection = nil
end
if request.idempotent? and attempt < @retries
retry
else
raise
end
ensure
@pool.release(connection) if connection
end
end
def inspect
"#<#{self.class} authority=#{@authority.inspect}>"
end
Traces::Provider(self) do
def call(request)
attributes = {
'http.method': request.method,
'http.authority': request.authority || self.authority,
'http.scheme': request.scheme || self.scheme,
'http.path': request.path,
}
if protocol = request.protocol
attributes['http.protocol'] = protocol
end
if length = request.body&.length
attributes['http.request.length'] = length
end
trace('async.http.client.call', attributes: attributes) do |span|
if context = self.trace_context
request.headers['traceparent'] = context.to_s
# request.headers['tracestate'] = context.state
end
super.tap do |response|
if status = response&.status
span['http.status_code'] = status
end
if length = response.body&.length
span['http.response.length'] = length
end
end
end
end
end
protected
def make_response(request, connection)
response = request.call(connection)
# The connection won't be released until the body is completely read/released.
::Protocol::HTTP::Body::Completable.wrap(response) do
@pool.release(connection)
end
return response
end
def make_pool(connection_limit)
Async::Pool::Controller.wrap(limit: connection_limit) do
Console.logger.debug(self) {"Making connection to #{@endpoint.inspect}"}
@protocol.client(@endpoint.connect)
end
end
end
end
end
|