1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
|
require 'eventmachine'
module Memcached
class Connection < EventMachine::Connection
def self.connect(host, port=11211, &connect_callback)
df = EventMachine::DefaultDeferrable.new
df.callback &connect_callback
EventMachine.connect(host, port, self) do |me|
me.instance_eval {
@host, @port = host, port
@connect_deferrable = df
}
end
end
def connected?
@connected
end
def reconnect
@connect_deferrable = EventMachine::DefaultDeferrable.new
super @host, @port
@connect_deferrable
end
def post_init
@recv_buf = ""
@recv_state = :header
@connected = false
@keepalive_timer = nil
end
def connection_completed
@connected = true
@connect_deferrable.succeed(self)
@last_receive = Time.now
@keepalive_timer = EventMachine::PeriodicTimer.new(1, &method(:keepalive))
end
RECONNECT_DELAY = 10
RECONNECT_JITTER = 5
def unbind
@keepalive_timer.cancel if @keepalive_timer
@connected = false
EventMachine::Timer.new(RECONNECT_DELAY + rand(RECONNECT_JITTER),
method(:reconnect))
end
RECEIVE_TIMEOUT = 15
KEEPALIVE_INTERVAL = 5
def keepalive
if @last_receive + RECEIVE_TIMEOUT <= Time.now
p :timeout
close_connection
elsif @last_receive + KEEPALIVE_INTERVAL <= Time.now
send_keepalive
end
end
def send_packet(pkt)
send_data pkt.to_s
end
def receive_data(data)
@recv_buf += data
@last_receive = Time.now
done = false
while not done
if @recv_state == :header && @recv_buf.length >= 24
@received = Response.parse_header(@recv_buf[0..23])
@recv_buf = @recv_buf[24..-1]
@recv_state = :body
elsif @recv_state == :body && @recv_buf.length >= @received[:total_body_length]
@recv_buf = @received.parse_body(@recv_buf)
receive_packet(@received)
@recv_state = :header
else
done = true
end
end
end
end
class Client < Connection
def post_init
super
@opaque_counter = 0
@pending = []
end
def unbind
super
@pending.each do |opaque, callback|
callback.call :status => Errors::DISCONNECTED
end
@pending = []
end
def send_request(pkt, &callback)
@opaque_counter += 1
@opaque_counter %= 1 << 32
pkt[:opaque] = @opaque_counter
send_packet pkt
if callback
@pending << [@opaque_counter, callback]
end
end
##
# memcached responses possess the same order as their
# corresponding requests. Therefore quiet requests that have not
# yielded responses will be dropped silently to free memory from
# +@pending+
#
# When a callback has been fired and returned +:proceed+ without a
# succeeding packet, we still keep it referenced around for
# commands such as STAT which has multiple response packets.
def receive_packet(response)
pending_pos = nil
pending_callback = nil
@pending.each_with_index do |(pending_opaque,pending_cb),i|
if response[:opaque] == pending_opaque
pending_pos = i
pending_callback = pending_cb
break
end
end
if pending_pos
@pending = @pending[pending_pos..-1]
begin
if pending_callback.call(response) != :proceed
@pending.shift
end
rescue Exception => e
$stderr.puts "#{e.class}: #{e}\n" + e.backtrace.join("\n")
end
end
end
def send_keepalive
send_request Request::NoOp.new
end
# Callback will be called multiple times
def stats(contents={}, &callback)
send_request Request::Stats.new(contents) do |result|
callback.call result
if result[:status] == Errors::NO_ERROR && result[:key] != ''
:proceed
end
end
end
end
end
|