1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
# Copyright 2011-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
require 'thread'
module AWS
module Core
module Http
# @api private
class CurbHandler
class NetworkError < StandardError; end
def initialize
@q = []
@sem = Mutex.new
@multi = Curl::Multi.new
start_processor
end
def handle request, response, &read_block
raise "unsupport http reqest method: #{request.http_method}" unless
['GET', 'HEAD', 'PUT', 'POST', 'DELETE'].include? request.http_method
@sem.synchronize do
@q << [request, response, read_block, Thread.current]
begin
@processor.wakeup
rescue ThreadError
start_processor
end
end
Thread.stop
nil
end
# fills the Curl::Multi handle with the given array of queue
# items, calling make_easy_handle on each one first
private
def fill_multi(items)
items.each do |item|
c = make_easy_handle(*item)
@multi.add(c)
end
end
# starts a background thread that waits for new items and
# sends them through the Curl::Multi handle
private
def start_processor
@processor = Thread.new do
loop do
items = nil
@sem.synchronize do
items = @q.slice!(0..-1)
end
unless items.empty?
fill_multi(items)
@multi.perform do
# curl is idle, so process more items if we can get them
# without blocking
if !@q.empty? && @sem.try_lock
begin
fill_multi(@q.slice!(0..-1))
ensure
@sem.unlock
end
end
end
end
# wait for a new item to show up before continuing
Thread.stop if @q.empty?
end
end
end
private
def make_easy_handle request, response, read_block, thread = nil
protocol = request.use_ssl? ? 'https' : 'http'
url = "#{protocol}://#{request.host}:#{request.port}#{request.uri}"
curl = Curl::Easy.new(url)
# curl.verbose = true
request.headers.each {|k, v| curl.headers[k] = v}
curl.on_header {|header_data|
if header_data =~ /:\s+/
name, value = header_data.strip.split(/:\s+/, 2)
response.headers[name] = value
end
header_data.length
}
case request.http_method
when 'GET'
# ....
when 'HEAD'
curl.head = true
when 'PUT'
curl.put_data = request.body || ''
when 'POST'
curl.headers['Content-Type'] = curl.headers['Content-Type'] || ''
curl.post_body = request.body || ''
when 'DELETE'
curl.delete = true
end
buffer = []
if read_block
curl.on_body do |chunk|
read_block.call(chunk)
chunk.size
end
else
curl.on_body do |chunk|
buffer << chunk
chunk.size
end
end
curl.on_complete do
response.status = curl.response_code
unless curl.response_code > 0
response.network_error = NetworkError.new('Empty response. Assume network error.')
end
unless read_block
response.body = buffer.join("")
end
thread.run if thread
end
curl
end
end
end
end
end
|