File: client.rb

package info (click to toggle)
ruby-riemann-client 1.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 228 kB
  • sloc: ruby: 1,271; makefile: 2
file content (130 lines) | stat: -rw-r--r-- 2,666 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# frozen_string_literal: true

require 'riemann'

module Riemann
  class Client
    class Error < RuntimeError; end
    class InvalidResponse < Error; end
    class ServerError < Error; end
    class Unsupported < Error; end
    class TooBig < Unsupported; end

    require 'socket'
    require 'time'

    HOST = '127.0.0.1'
    PORT = 5555
    TIMEOUT = 5

    require 'riemann/client/tcp'
    require 'riemann/client/udp'

    attr_reader :tcp, :udp

    def initialize(opts = {})
      @options = opts.dup
      @options[:host] ||= HOST
      @options[:port] ||= PORT
      @options[:timeout] ||= TIMEOUT

      @udp = UDP.new(@options)
      @tcp = TCP.new(@options)
      return unless block_given?

      begin
        yield self
      ensure
        close
      end
    end

    def host
      @options[:host]
    end

    def port
      @options[:port]
    end

    def timeout
      @options[:timeout]
    end

    # Send a state
    def <<(event)
      # Create state
      case event
      when Riemann::State, Riemann::Event, Hash
        # Noop
      else
        raise(ArgumentError, "Unsupported event class: #{event.class.name}")
      end

      bulk_send([event])
    end

    def bulk_send(events)
      raise ArgumentError unless events.is_a?(Array)

      message = Riemann::Message.new(events: normalize_events(events))

      send_maybe_recv(message)
    end

    def normalize_events(events)
      events.map do |event|
        case event
        when Riemann::State, Riemann::Event
          event
        when Hash
          e = if event.include?(:host)
                event
              else
                event.dup.merge(host: Socket.gethostname)
              end
          Riemann::Event.new(e)
        else
          raise(ArgumentError, "Unsupported event class: #{event.class.name}")
        end
      end
    end

    # Returns an array of states matching query.
    def [](query)
      response = query(query)
      (response.events || []) |
        (response.states || [])
    end

    def connect
      # NOTE: connections are made automatically on send
      warn 'Riemann client#connect is deprecated'
    end

    # Close both UDP and TCP sockets.
    def close
      @udp.close
      @tcp.close
    end

    def connected?
      tcp.connected? and udp.connected?
    end

    # Ask for states
    def query(string = 'true')
      send_recv Riemann::Message.new(query: Riemann::Query.new(string: string))
    end

    def send_recv(message)
      @tcp.send_recv(message)
    end

    def send_maybe_recv(message)
      @udp.send_maybe_recv(message)
    rescue TooBig
      @tcp.send_maybe_recv(message)
    end
  end
end