1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
require 'socket'
module Moneta
module Adapters
# Moneta client backend
# @api public
class Client < Adapter
# @!method initialize(options = {})
# @param [Hash] options
# @option options [TCPSocket | UNIXSocket] :backend an open socket to use
# @option options [Integer] :port (9000) TCP port
# @option options [String] :host ('127.0.0.1') Hostname
# @option options [String] :socket Unix socket file name as alternative to `:port` and `:host`
backend do |socket: nil, host: '127.0.0.1', port: 9000|
if socket
UNIXSocket.open(socket)
else
TCPSocket.open(host, port)
end
end
# (see Proxy#key?)
def key?(key, options = {})
write(:key?, key, options)
read_msg
end
# (see Proxy#load)
def load(key, options = {})
write(:load, key, options)
read_msg
end
# (see Proxy#store)
def store(key, value, options = {})
write(:store, key, value, options)
read_msg
value
end
# (see Proxy#delete)
def delete(key, options = {})
write(:delete, key, options)
read_msg
end
# (see Proxy#increment)
def increment(key, amount = 1, options = {})
write(:increment, key, amount, options)
read_msg
end
# (see Proxy#create)
def create(key, value, options = {})
write(:create, key, value, options)
read_msg
end
# (see Proxy#clear)
def clear(options = {})
write(:clear, options)
read_msg
self
end
# (see Proxy#close)
def close
backend.close
nil
end
# (see Proxy#each_key)
def each_key
raise NotImplementedError, 'each_key is not supported' unless supports?(:each_key)
return enum_for(:each_key) unless block_given?
begin
write(:each_key)
yield_break = false
loop do
write('NEXT')
# A StopIteration error will be raised by this call if the server
# reached the end of the enumeration. This will stop the loop
# automatically.
result = read_msg
# yield_break will be true in the ensure block (below) if anything
# happened during the yield to stop further enumeration.
yield_break = true
yield result
yield_break = false
end
ensure
write('BREAK') if yield_break
read_msg # nil return from each_key
end
self
end
# (see Default#features)
def features
@features ||=
begin
write(:features)
read_msg.freeze
end
end
private
def write(*args)
s = Marshal.dump(args)
backend.write([s.bytesize].pack('N') << s)
end
# JRuby doesn't support socket#recv with flags
if defined?(JRUBY_VERSION)
def read(bytes)
received = backend.read(bytes)
raise EOFError, "Server closed socket" unless received && received.bytesize == bytes
received
end
else
def read(bytes)
received = backend.recv(bytes, Socket::MSG_WAITALL)
raise EOFError, "Server closed socket" unless received && received.bytesize == bytes
received
end
end
def read_msg
size = read(4).unpack1('N')
result = Marshal.load(read(size))
raise result if Exception === result
result
end
end
end
end
|