
|
require 'eventmachine'
module Memcached
class Connection < EventMachine::Connection
def self.connect(host, port=11211, &connect_callback)
df = EventMachine::DefaultDeferrable.new
df.callback &connect_callback
EventMachine.connect(host, port, self) do |me|
me.instance_eval {
@host, @port = host, port
@connect_deferrable = df
}
end
end
def connected?
@connected
end
def reconnect
@connect_deferrable = EventMachine::DefaultDeferrable.new
super @host, @port
@connect_deferrable
end
def post_init
@recv_buf = ""
@recv_state = :header
@connected = false
@keepalive_timer = nil
end
def connection_completed
@connected = true
@connect_deferrable.succeed(self)
@last_receive = Time.now
@keepalive_timer = EventMachine::PeriodicTimer.new(1, &method(:keepalive))
end
RECONNECT_DELAY = 10
RECONNECT_JITTER = 5
def unbind
@keepalive_timer.cancel if @keepalive_timer
@connected = false
EventMachine::Timer.new(RECONNECT_DELAY + rand(RECONNECT_JITTER),
method(:reconnect))
end
RECEIVE_TIMEOUT = 15
KEEPALIVE_INTERVAL = 5
def keepalive
if @last_receive + RECEIVE_TIMEOUT <= Time.now
p :timeout
close_connection
elsif @last_receive + KEEPALIVE_INTERVAL <= Time.now
send_keepalive
end
end
def send_packet(pkt)
send_data pkt.to_s
end
def receive_data(data)
@recv_buf += data
@last_receive = Time.now
done = false
while not done
if @recv_state == :header && @recv_buf.length >= 24
@received = Response.parse_header(@recv_buf[0..23])
@recv_buf = @recv_buf[24..-1]
@recv_state = :body
elsif @recv_state == :body && @recv_buf.length >= @received[:total_body_length]
@recv_buf = @received.parse_body(@recv_buf)
receive_packet(@received)
@recv_state = :header
else
done = true
end
end
end
end
class Client < Connection
def post_init
super
@opaque_counter = 0
@pending = []
end
def unbind
super
@pending.each do |opaque, callback|
callback.call :status => Errors::DISCONNECTED
end
@pending = []
end
def send_request(pkt, &callback)
@opaque_counter += 1
@opaque_counter %= 1 << 32
pkt[:opaque] = @opaque_counter
send_packet pkt
if callback
@pending << [@opaque_counter, callback]
end
end
##
# memcached responses possess the same order as their
# corresponding requests. Therefore quiet requests that have not
# yielded responses will be dropped silently to free memory from
# +@pending+
#
# When a callback has been fired and returned +:proceed+ without a
# succeeding packet, we still keep it referenced around for
# commands such as STAT which has multiple response packets.
def receive_packet(response)
pending_pos = nil
pending_callback = nil
@pending.each_with_index do |(pending_opaque,pending_cb),i|
if response[:opaque] == pending_opaque
pending_pos = i
pending_callback = pending_cb
break
end
end
if pending_pos
@pending = @pending[pending_pos..-1]
begin
if pending_callback.call(response) != :proceed
@pending.shift
end
rescue Exception => e
$stderr.puts "#{e.class}: #{e}\n" + e.backtrace.join("\n")
end
end
end
def send_keepalive
send_request Request::NoOp.new
end
# Callback will be called multiple times
def stats(contents={}, &callback)
send_request Request::Stats.new(contents) do |result|
callback.call result
if result[:status] == Errors::NO_ERROR && result[:key] != ''
:proceed
end
end
end
end
end
|