1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# frozen_string_literal: true
require 'riemann'
module Riemann
class Client
class Error < RuntimeError; end
class InvalidResponse < Error; end
class ServerError < Error; end
class Unsupported < Error; end
class TooBig < Unsupported; end
require 'socket'
require 'time'
HOST = '127.0.0.1'
PORT = 5555
TIMEOUT = 5
require 'riemann/client/tcp'
require 'riemann/client/udp'
attr_reader :tcp, :udp
def initialize(opts = {})
@options = opts.dup
@options[:host] ||= HOST
@options[:port] ||= PORT
@options[:timeout] ||= TIMEOUT
@udp = UDP.new(@options)
@tcp = TCP.new(@options)
return unless block_given?
begin
yield self
ensure
close
end
end
def host
@options[:host]
end
def port
@options[:port]
end
def timeout
@options[:timeout]
end
# Send a state
def <<(event)
# Create state
case event
when Riemann::State, Riemann::Event, Hash
# Noop
else
raise(ArgumentError, "Unsupported event class: #{event.class.name}")
end
bulk_send([event])
end
def bulk_send(events)
raise ArgumentError unless events.is_a?(Array)
message = Riemann::Message.new(events: normalize_events(events))
send_maybe_recv(message)
end
def normalize_events(events)
events.map do |event|
case event
when Riemann::State, Riemann::Event
event
when Hash
e = if event.include?(:host)
event
else
event.dup.merge(host: Socket.gethostname)
end
Riemann::Event.new(e)
else
raise(ArgumentError, "Unsupported event class: #{event.class.name}")
end
end
end
# Returns an array of states matching query.
def [](query)
response = query(query)
(response.events || []) |
(response.states || [])
end
def connect
# NOTE: connections are made automatically on send
warn 'Riemann client#connect is deprecated'
end
# Close both UDP and TCP sockets.
def close
@udp.close
@tcp.close
end
def connected?
tcp.connected? and udp.connected?
end
# Ask for states
def query(string = 'true')
send_recv Riemann::Message.new(query: Riemann::Query.new(string: string))
end
def send_recv(message)
@tcp.send_recv(message)
end
def send_maybe_recv(message)
@udp.send_maybe_recv(message)
rescue TooBig
@tcp.send_maybe_recv(message)
end
end
end
|