File: event_machine.rb

package info (click to toggle)
ruby-memcache-client 1.8.5-2
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 268 kB
  • sloc: ruby: 2,120; makefile: 6
file content (173 lines) | stat: -rw-r--r-- 3,535 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# Extensions for using memcache-client with EventMachine

raise "memcache/event_machine requires Ruby 1.9" if RUBY_VERSION < '1.9'

require 'memcache'
require 'eventmachine'
require 'fiber'

class MemCache
  
  # Since we are working in a single Thread, multiple Fiber environment,
  # disable the multithread Mutex as it will not work.
#  DEFAULT_OPTIONS[:multithread] = false

  module EventedServer

    def fiber_key
      @fiber_key ||= "memcached-#{@host}-#{@port}"
    end
    
    def socket
      sock = Thread.current[fiber_key]
      return sock if sock and not sock.closed?

      Thread.current[fiber_key] = nil

      # If the host was dead, don't retry for a while.
      return if @retry and @retry > Time.now
    
      Thread.current[fiber_key] ||= begin
        sock = EM::SocketConnection.connect(@host, @port, @timeout)
        yielding = true
        fiber = Fiber.current
        sock.callback do
          @status = 'CONNECTED'
          @retry  = nil
          yielding = false
          fiber.resume if Fiber.current != fiber
        end
        sock.errback do
          sock = nil
          yielding = false
          fiber.resume if Fiber.current != fiber
        end
        Fiber.yield if yielding
        sock
      end
    end

    def close
      sock = Thread.current[fiber_key]
      if sock
        sock.close if !sock.closed?
        Thread.current[fiber_key] = nil
      end
      @retry  = nil
      @status = "NOT CONNECTED"
    end

  end
end

module EM
  module SocketConnection
    include EM::Deferrable

    def self.connect(host, port, timeout)
      EM.connect(host, port, self) do |conn|
        conn.pending_connect_timeout = timeout
      end
    end

    def initialize
      @connected = false
      @index = 0
      @buf = ''
    end

    def closed?
      !@connected
    end

    def close
      @connected = false
      close_connection(true)
    end

    def write(buf)
      send_data(buf)
    end

    def read(size)
      if can_read?(size)
        yank(size)
      else
        fiber = Fiber.current
        @size = size
        @callback = proc { |data|
          fiber.resume(data)
        }
        # TODO Can leak fiber if the connection dies while
        # this fiber is yielded, waiting for data
        Fiber.yield
      end
    end
    
    SEP = "\r\n"

    def gets
      while true
        # Read to ensure we have some data in the buffer
        line = read(2)
        # Reset the buffer index to zero
        @buf = @buf.slice(@index..-1)
        @index = 0
        if eol = @buf.index(SEP)
          line << yank(eol + SEP.size)
          break
        else
          # EOL not in the current buffer
          line << yank(@buf.size)
        end
      end
      line
    end

    def can_read?(size)
      @buf.size >= @index + size
    end

    # EM callbacks

    def receive_data(data)
      @buf << data

      if @callback and can_read?(@size)
        callback = @callback
        data = yank(@size)
        @callback = @size = nil
        callback.call(data)
      end
    end

    def post_init
      @connected = true
      succeed
    end

    def unbind
      if @connected
        @connected = false
      else
        fail
      end
    end
    
    private
    
    BUFFER_SIZE = 4096

    def yank(len)      
      data = @buf.slice(@index, len)
      @index += len
      @index = @buf.size if @index > @buf.size
      if @index >= BUFFER_SIZE
        @buf = @buf.slice(@index..-1)
        @index = 0
      end
      data
    end
    
  end
end