File: client.rb

package info (click to toggle)
ruby-remcached 0.4.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 200 kB
  • sloc: ruby: 1,000; sh: 9; makefile: 2
file content (165 lines) | stat: -rw-r--r-- 4,067 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
require 'eventmachine'

module Memcached
  class Connection < EventMachine::Connection
    def self.connect(host, port=11211, &connect_callback)
      df = EventMachine::DefaultDeferrable.new
      df.callback &connect_callback

      EventMachine.connect(host, port, self) do |me|
        me.instance_eval { 
          @host, @port = host, port
          @connect_deferrable = df
        }
      end
    end

    def connected?
      @connected
    end

    def reconnect
      @connect_deferrable = EventMachine::DefaultDeferrable.new
      super @host, @port
      @connect_deferrable
    end

    def post_init
      @recv_buf = ""
      @recv_state = :header
      @connected = false
      @keepalive_timer = nil
    end

    def connection_completed
      @connected = true
      @connect_deferrable.succeed(self)

      @last_receive = Time.now
      @keepalive_timer = EventMachine::PeriodicTimer.new(1, &method(:keepalive))
    end

    RECONNECT_DELAY = 10
    RECONNECT_JITTER = 5
    def unbind
      @keepalive_timer.cancel if @keepalive_timer

      @connected = false
      EventMachine::Timer.new(RECONNECT_DELAY + rand(RECONNECT_JITTER),
                              method(:reconnect))
    end

    RECEIVE_TIMEOUT = 15
    KEEPALIVE_INTERVAL = 5
    def keepalive
      if @last_receive + RECEIVE_TIMEOUT <= Time.now
        p :timeout
        close_connection
      elsif @last_receive + KEEPALIVE_INTERVAL <= Time.now
        send_keepalive
      end
    end

    def send_packet(pkt)
      send_data pkt.to_s
    end

    def receive_data(data)
      @recv_buf += data
      @last_receive = Time.now

      done = false
      while not done

        if @recv_state == :header && @recv_buf.length >= 24
          @received = Response.parse_header(@recv_buf[0..23])
          @recv_buf = @recv_buf[24..-1]
          @recv_state = :body

        elsif @recv_state == :body && @recv_buf.length >= @received[:total_body_length]
          @recv_buf = @received.parse_body(@recv_buf)
          receive_packet(@received)

          @recv_state = :header

        else
          done = true
        end
      end
    end
  end

  class Client < Connection
    def post_init
      super
      @opaque_counter = 0
      @pending = []
    end

    def unbind
      super
      @pending.each do |opaque, callback|
        callback.call :status => Errors::DISCONNECTED
      end
      @pending = []
    end

    def send_request(pkt, &callback)
      @opaque_counter += 1
      @opaque_counter %= 1 << 32
      pkt[:opaque] = @opaque_counter
      send_packet pkt

      if callback
        @pending << [@opaque_counter, callback]
      end
    end

    ##
    # memcached responses possess the same order as their
    # corresponding requests. Therefore quiet requests that have not
    # yielded responses will be dropped silently to free memory from
    # +@pending+
    #
    # When a callback has been fired and returned +:proceed+ without a
    # succeeding packet, we still keep it referenced around for
    # commands such as STAT which has multiple response packets.
    def receive_packet(response)
      pending_pos = nil
      pending_callback = nil
      @pending.each_with_index do |(pending_opaque,pending_cb),i|
        if response[:opaque] == pending_opaque
          pending_pos = i
          pending_callback = pending_cb
          break
        end
      end

      if pending_pos
        @pending = @pending[pending_pos..-1]
        begin
          if pending_callback.call(response) != :proceed
            @pending.shift
          end
        rescue Exception => e
          $stderr.puts "#{e.class}: #{e}\n" + e.backtrace.join("\n")
        end
      end
    end

    def send_keepalive
      send_request Request::NoOp.new
    end

    # Callback will be called multiple times
    def stats(contents={}, &callback)
      send_request Request::Stats.new(contents) do |result|
        callback.call result

        if result[:status] == Errors::NO_ERROR && result[:key] != ''
          :proceed
        end
      end
    end
  end
end