1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
|
require 'riemann'
class Riemann::Client
class Error < RuntimeError; end
class InvalidResponse < Error; end
class ServerError < Error; end
class Unsupported < Error; end
class TooBig < Unsupported; end
require 'thread'
require 'socket'
require 'time'
HOST = '127.0.0.1'
PORT = 5555
TIMEOUT = 5
require 'riemann/client/tcp'
require 'riemann/client/udp'
attr_reader :tcp, :udp
def initialize(opts = {})
@options = opts.dup
@options[:host] ||= HOST
@options[:port] ||= PORT
@options[:timeout] ||= TIMEOUT
@udp = UDP.new(@options)
@tcp = TCP.new(@options)
end
def host
@options[:host]
end
def port
@options[:port]
end
def timeout
@options[:timeout]
end
# Send a state
def <<(event_opts)
# Create state
case event_opts
when Riemann::State
event = event_opts
when Riemann::Event
event = event_opts
else
unless event_opts.include? :host
event_opts[:host] = Socket.gethostname
end
event = Riemann::Event.new(event_opts)
end
message = Riemann::Message.new :events => [event]
# Transmit
send_maybe_recv message
end
# Returns an array of states matching query.
def [](query)
response = query(query)
(response.events || []) |
(response.states || [])
end
# Close both UDP and TCP sockets.
def close
@udp.close
@tcp.close
end
# Connect both UDP and TCP sockets.
def connect
udp.connect
tcp.connect
end
def connected?
tcp.connected? and udp.connected?
end
# Ask for states
def query(string = "true")
send_recv Riemann::Message.new(:query => Riemann::Query.new(:string => string))
end
def send_recv(*a)
@tcp.send_recv *a
end
def send_maybe_recv(*a)
begin
@udp.send_maybe_recv *a
rescue TooBig
@tcp.send_maybe_recv *a
end
end
end
|