1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
require 'socket'
module Thin
# Connection between the server and client.
# This class is instanciated by EventMachine on each new connection
# that is opened.
class Connection < EventMachine::Connection
CONTENT_LENGTH = 'Content-Length'.freeze
TRANSFER_ENCODING = 'Transfer-Encoding'.freeze
CHUNKED_REGEXP = /\bchunked\b/i.freeze
include Logging
# This is a template async response. N.B. Can't use string for body on 1.9
AsyncResponse = [-1, {}, []].freeze
# Rack application (adapter) served by this connection.
attr_accessor :app
# Backend to the server
attr_accessor :backend
# Current request served by the connection
attr_accessor :request
# Next response sent through the connection
attr_accessor :response
# Calling the application in a threaded allowing
# concurrent processing of requests.
attr_writer :threaded
# Get the connection ready to process a request.
def post_init
@request = Request.new
@response = Response.new
end
# Called when data is received from the client.
def receive_data(data)
trace { data }
process if @request.parse(data)
rescue InvalidRequest => e
log "!! Invalid request"
log_error e
close_connection
end
# Called when all data was received and the request
# is ready to be processed.
def process
if threaded?
@request.threaded = true
EventMachine.defer(method(:pre_process), method(:post_process))
else
@request.threaded = false
post_process(pre_process)
end
end
def pre_process
# Add client info to the request env
@request.remote_address = remote_address
# Connection may be closed unless the App#call response was a [-1, ...]
# It should be noted that connection objects will linger until this
# callback is no longer referenced, so be tidy!
@request.async_callback = method(:post_process)
# When we're under a non-async framework like rails, we can still spawn
# off async responses using the callback info, so there's little point
# in removing this.
response = AsyncResponse
catch(:async) do
# Process the request calling the Rack adapter
response = @app.call(@request.env)
end
response
rescue Exception
handle_error
terminate_request
nil # Signal to post_process that the request could not be processed
end
def post_process(result)
return unless result
result = result.to_a
# Status code -1 indicates that we're going to respond later (async).
return if result.first == AsyncResponse.first
# Set the Content-Length header if possible
set_content_length(result) if need_content_length?(result)
@response.status, @response.headers, @response.body = *result
log "!! Rack application returned nil body. Probably you wanted it to be an empty string?" if @response.body.nil?
# Make the response persistent if requested by the client
@response.persistent! if @request.persistent?
# Send the response
@response.each do |chunk|
trace { chunk }
send_data chunk
end
rescue Exception
handle_error
ensure
# If the body is being deferred, then terminate afterward.
if @response.body.respond_to?(:callback) && @response.body.respond_to?(:errback)
@response.body.callback { terminate_request }
@response.body.errback { terminate_request }
else
# Don't terminate the response if we're going async.
terminate_request unless result && result.first == AsyncResponse.first
end
end
# Logs catched exception and closes the connection.
def handle_error
log "!! Unexpected error while processing request: #{$!.message}"
log_error
close_connection rescue nil
end
def close_request_response
@request.async_close.succeed if @request.async_close
@request.close rescue nil
@response.close rescue nil
end
# Does request and response cleanup (closes open IO streams and
# deletes created temporary files).
# Re-initializes response and request if client supports persistent
# connection.
def terminate_request
unless persistent?
close_connection_after_writing rescue nil
close_request_response
else
close_request_response
# Prepare the connection for another request if the client
# supports HTTP pipelining (persistent connection).
post_init
end
end
# Called when the connection is unbinded from the socket
# and can no longer be used to process requests.
def unbind
@request.async_close.succeed if @request.async_close
@response.body.fail if @response.body.respond_to?(:fail)
@backend.connection_finished(self)
end
# Allows this connection to be persistent.
def can_persist!
@can_persist = true
end
# Return +true+ if this connection is allowed to stay open and be persistent.
def can_persist?
@can_persist
end
# Return +true+ if the connection must be left open
# and ready to be reused for another request.
def persistent?
@can_persist && @response.persistent?
end
# +true+ if <tt>app.call</tt> will be called inside a thread.
# You can set all requests as threaded setting <tt>Connection#threaded=true</tt>
# or on a per-request case returning +true+ in <tt>app.deferred?</tt>.
def threaded?
@threaded || (@app.respond_to?(:deferred?) && @app.deferred?(@request.env))
end
# IP Address of the remote client.
def remote_address
socket_address
rescue Exception
log_error
nil
end
protected
# Returns IP address of peer as a string.
def socket_address
Socket.unpack_sockaddr_in(get_peername)[1]
end
private
def need_content_length?(result)
status, headers, body = result
return false if status == -1
return false if headers.has_key?(CONTENT_LENGTH)
return false if (100..199).include?(status) || status == 204 || status == 304
return false if headers.has_key?(TRANSFER_ENCODING) && headers[TRANSFER_ENCODING] =~ CHUNKED_REGEXP
return false unless body.kind_of?(String) || body.kind_of?(Array)
true
end
def set_content_length(result)
headers, body = result[1..2]
case body
when String
# See http://redmine.ruby-lang.org/issues/show/203
headers[CONTENT_LENGTH] = (body.respond_to?(:bytesize) ? body.bytesize : body.size).to_s
when Array
bytes = 0
body.each do |p|
bytes += p.respond_to?(:bytesize) ? p.bytesize : p.size
end
headers[CONTENT_LENGTH] = bytes.to_s
end
end
end
end
|