1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
|
require 'em/io_streamer'
module EventMachine
module HTTPMethods
def get options = {}, &blk; setup_request(:get, options, &blk); end
def head options = {}, &blk; setup_request(:head, options, &blk); end
def delete options = {}, &blk; setup_request(:delete, options, &blk); end
def put options = {}, &blk; setup_request(:put, options, &blk); end
def post options = {}, &blk; setup_request(:post, options, &blk); end
def patch options = {}, &blk; setup_request(:patch, options, &blk); end
def options options = {}, &blk; setup_request(:options, options, &blk); end
end
class HttpStubConnection < Connection
include Deferrable
attr_reader :parent
def parent=(p)
@parent = p
@parent.conn = self
end
def receive_data(data)
begin
@parent.receive_data data
rescue EventMachine::Connectify::CONNECTError => e
@parent.close(e.message)
end
end
def connection_completed
@parent.connection_completed
end
def unbind(reason=nil)
@parent.unbind(reason)
end
# TLS verification support, original implementation by Mislav Marohnić
# https://github.com/lostisland/faraday/blob/63cf47c95b573539f047c729bd9ad67560bc83ff/lib/faraday/adapter/em_http_ssl_patch.rb
def ssl_verify_peer(cert_string)
cert = nil
begin
cert = OpenSSL::X509::Certificate.new(cert_string)
rescue OpenSSL::X509::CertificateError
return false
end
@last_seen_cert = cert
if certificate_store.verify(@last_seen_cert)
begin
certificate_store.add_cert(@last_seen_cert)
rescue OpenSSL::X509::StoreError => e
raise e unless e.message == 'cert already in hash table'
end
true
else
raise OpenSSL::SSL::SSLError.new(%(unable to verify the server certificate for "#{host}"))
end
end
def ssl_handshake_completed
unless verify_peer?
warn "[WARNING; em-http-request] TLS hostname validation is disabled (use 'tls: {verify_peer: true}'), see" +
" CVE-2020-13482 and https://github.com/igrigorik/em-http-request/issues/339 for details" unless parent.connopts.tls.has_key?(:verify_peer)
return true
end
unless OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, host)
raise OpenSSL::SSL::SSLError.new(%(host "#{host}" does not match the server certificate))
else
true
end
end
def verify_peer?
parent.connopts.tls[:verify_peer]
end
def host
parent.connopts.host
end
def certificate_store
@certificate_store ||= begin
store = OpenSSL::X509::Store.new
store.set_default_paths
ca_file = parent.connopts.tls[:cert_chain_file]
store.add_file(ca_file) if ca_file
store
end
end
end
class HttpConnection
include HTTPMethods
include Socksify
include Connectify
attr_reader :deferred, :conn
attr_accessor :error, :connopts, :uri
def initialize
@deferred = true
@middleware = []
end
def conn=(c)
@conn = c
@deferred = false
end
def activate_connection(client)
begin
EventMachine.bind_connect(@connopts.bind, @connopts.bind_port,
@connopts.host, @connopts.port,
HttpStubConnection) do |conn|
post_init
@deferred = false
@conn = conn
conn.parent = self
conn.pending_connect_timeout = @connopts.connect_timeout
conn.comm_inactivity_timeout = @connopts.inactivity_timeout
end
finalize_request(client)
rescue EventMachine::ConnectionError => e
#
# Currently, this can only fire on initial connection setup
# since #connect is a synchronous method. Hence, rescue the exception,
# and return a failed deferred which fail any client request at next
# tick. We fail at next tick to keep a consistent API when the newly
# created HttpClient is failed. This approach has the advantage to
# remove a state check of @deferred_status after creating a new
# HttpRequest. The drawback is that users may setup a callback which we
# know won't be used.
#
# Once there is async-DNS, then we'll iterate over the outstanding
# client requests and fail them in order.
#
# Net outcome: failed connection will invoke the same ConnectionError
# message on the connection deferred, and on the client deferred.
#
EM.next_tick{client.close(e.message)}
end
end
def setup_request(method, options = {}, c = nil)
c ||= HttpClient.new(self, HttpClientOptions.new(@uri, options, method))
@deferred ? activate_connection(c) : finalize_request(c)
c
end
def finalize_request(c)
@conn.callback { c.connection_completed }
middleware.each do |m|
c.callback(&m.method(:response)) if m.respond_to?(:response)
end
@clients.push c
end
def middleware
[HttpRequest.middleware, @middleware].flatten
end
def post_init
@clients = []
@pending = []
@p = Http::Parser.new
@p.header_value_type = :mixed
@p.on_headers_complete = proc do |h|
if client
if @p.status_code == 100
client.send_request_body
@p.reset!
else
client.parse_response_header(h, @p.http_version, @p.status_code)
:reset if client.req.no_body?
end
else
# if we receive unexpected data without a pending client request
# reset the parser to avoid firing any further callbacks and close
# the connection because we're processing invalid HTTP
@p.reset!
unbind
end
end
@p.on_body = proc do |b|
client.on_body_data(b)
end
@p.on_message_complete = proc do
if client and !client.continue?
c = @clients.shift
c.state = :finished
c.on_request_complete
end
end
end
def use(klass, *args, &block)
@middleware << klass.new(*args, &block)
end
def peer
Socket.unpack_sockaddr_in(@peer)[1] rescue nil
end
def receive_data(data)
begin
@p << data
rescue HTTP::Parser::Error => e
c = @clients.shift
c.nil? ? unbind(e.message) : c.on_error(e.message)
end
end
def connection_completed
@peer = @conn.get_peername
if @connopts.socks_proxy?
socksify(client.req.uri.hostname, client.req.uri.inferred_port, *@connopts.proxy[:authorization]) { start }
elsif @connopts.connect_proxy?
connectify(client.req.uri.hostname, client.req.uri.inferred_port, *@connopts.proxy[:authorization]) { start }
else
start
end
end
def start
@conn.start_tls(@connopts.tls) if client && client.req.ssl?
@conn.succeed
end
def redirect(client, new_location)
old_location = client.req.uri
new_location = client.req.set_uri(new_location)
if client.req.keepalive
# Application requested a keep-alive connection but one of the requests
# hits a cross-origin redirect. We need to open a new connection and
# let both connections proceed simultaneously.
if old_location.origin != new_location.origin
conn = HttpConnection.new
client.conn = conn
conn.connopts = @connopts
conn.connopts.https = new_location.scheme == "https"
conn.uri = client.req.uri
conn.activate_connection(client)
# If the redirect is a same-origin redirect on a keep-alive request
# then immidiately dispatch the request over existing connection.
else
@clients.push client
client.connection_completed
end
else
# If connection is not keep-alive the unbind will fire and we'll
# reconnect using the same connection object.
@pending.push client
end
end
def unbind(reason = nil)
@clients.map { |c| c.unbind(reason) }
if r = @pending.shift
@clients.push r
r.reset!
@p.reset!
begin
@conn.set_deferred_status :unknown
if @connopts.proxy
@conn.reconnect(@connopts.host, @connopts.port)
else
@conn.reconnect(r.req.host, r.req.port)
end
@conn.pending_connect_timeout = @connopts.connect_timeout
@conn.comm_inactivity_timeout = @connopts.inactivity_timeout
@conn.callback { r.connection_completed }
rescue EventMachine::ConnectionError => e
@clients.pop.close(e.message)
end
else
@deferred = true
@conn.close_connection
end
end
alias :close :unbind
def send_data(data)
@conn.send_data data
end
def stream_file_data(filename, args = {})
@conn.stream_file_data filename, args
end
def stream_data(io, opts = {})
EventMachine::IOStreamer.new(self, io, opts)
end
private
def client
@clients.first
end
end
end
|