File: notifier.rb

package info (click to toggle)
ruby-gelf 3.1.0-3.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 244 kB
  • sloc: ruby: 1,039; makefile: 2
file content (280 lines) | stat: -rw-r--r-- 8,843 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
require 'gelf/transport/udp'
require 'gelf/transport/tcp'
require 'gelf/transport/tcp_tls'

# replace JSON and #to_json with Yajl if available
begin
  require 'yajl/json_gem'
rescue LoadError
end

module GELF
  # Graylog2 notifier.
  class Notifier
    # Maximum number of GELF chunks as per GELF spec
    MAX_CHUNKS = 128
    MAX_CHUNK_SIZE_WAN = 1420
    MAX_CHUNK_SIZE_LAN = 8154

    attr_accessor :enabled, :collect_file_and_line, :rescue_network_errors
    attr_reader :max_chunk_size, :level, :default_options, :level_mapping

    # +host+ and +port+ are host/ip and port of graylog2-server.
    # +max_size+ is passed to max_chunk_size=.
    # +default_options+ is used in notify!
    def initialize(host = 'localhost', port = 12201, max_size = 'WAN', default_options = {})
      @enabled = true
      @collect_file_and_line = true
      @random = Random.new

      self.level = GELF::DEBUG
      self.max_chunk_size = max_size
      self.rescue_network_errors = false

      self.default_options = default_options.dup
      self.default_options['version'] = SPEC_VERSION
      self.default_options['host'] ||= Socket.gethostname
      self.default_options['level'] ||= GELF::UNKNOWN
      self.default_options['facility'] ||= 'gelf-rb'
      self.default_options['protocol'] ||= GELF::Protocol::UDP

      self.level_mapping = :logger
      @sender = create_sender(host, port)
    end

    # Get a list of receivers.
    #    notifier.addresses  # => [['localhost', 12201], ['localhost', 12202]]
    def addresses
      @sender.addresses
    end

    # Set a list of receivers.
    #    notifier.addresses = [['localhost', 12201], ['localhost', 12202]]
    def addresses=(addrs)
      @sender.addresses = addrs
    end

    # +size+ may be a number of bytes, 'WAN' (1420 bytes) or 'LAN' (8154).
    # Default (safe) value is 'WAN'.
    def max_chunk_size=(size)
      case size.to_s.downcase
        when 'wan'
          @max_chunk_size = MAX_CHUNK_SIZE_WAN
        when 'lan'
          @max_chunk_size = MAX_CHUNK_SIZE_LAN
        else
          @max_chunk_size = size.to_int
      end
    end

    def level=(new_level)
      @level = if new_level.is_a?(Integer)
                 new_level
               else
                 GELF.const_get(new_level.to_s.upcase)
               end
    end

    def default_options=(options)
      @default_options = self.class.stringify_keys(options)
    end

    # +mapping+ may be a hash, 'logger' (GELF::LOGGER_MAPPING) or 'direct' (GELF::DIRECT_MAPPING).
    # Default (compatible) value is 'logger'.
    def level_mapping=(mapping)
      case mapping.to_s.downcase
        when 'logger'
          @level_mapping = GELF::LOGGER_MAPPING
        when 'direct'
          @level_mapping = GELF::DIRECT_MAPPING
        else
          @level_mapping = mapping
      end
    end

    def disable
      @enabled = false
    end

    def enable
      @enabled = true
    end

    # Closes sender
    def close
      @sender.close
    end

    # Same as notify!, but rescues all exceptions (including +ArgumentError+)
    # and sends them instead.
    def notify(*args)
      notify_with_level(nil, *args)
    end

    # Sends message to Graylog2 server.
    # +args+ can be:
    # - hash-like object (any object which responds to +to_hash+, including +Hash+ instance):
    #    notify!(:short_message => 'All your rebase are belong to us', :user => 'AlekSi')
    # - exception with optional hash-like object:
    #    notify!(SecurityError.new('ALARM!'), :trespasser => 'AlekSi')
    # - string-like object (anything which responds to +to_s+) with optional hash-like object:
    #    notify!('Plain olde text message', :scribe => 'AlekSi')
    # Resulted fields are merged with +default_options+, the latter will never overwrite the former.
    # This method will raise +ArgumentError+ if arguments are wrong. Consider using notify instead.
    def notify!(*args)
      notify_with_level!(nil, *args)
    end

    GELF::Levels.constants.each do |const|
      define_method(const.downcase) do |*args|
        level = GELF.const_get(const)
        notify_with_level(level, *args)
      end
    end

  private

    def create_sender(host, port)
      addresses = [[host, port]]
      if default_options['protocol'] == GELF::Protocol::TCP
        if default_options.key?('tls')
          tls_options = default_options.delete('tls')
          GELF::Transport::TCPTLS.new(addresses, tls_options)
        else
          GELF::Transport::TCP.new(addresses)
        end
      else
        GELF::Transport::UDP.new(addresses)
      end
    end

    def notify_with_level(message_level, *args)
      notify_with_level!(message_level, *args)
    rescue SocketError, SystemCallError
      raise unless rescue_network_errors
    rescue Exception => exception
      notify_with_level!(GELF::UNKNOWN, exception)
    end

    def notify_with_level!(message_level, *args)
      return unless @enabled
      hash = extract_hash(*args)
      hash['level'] = message_level unless message_level.nil?
      if hash['level'] >= level
        if default_options['protocol'] == GELF::Protocol::TCP
          validate_hash(hash)
          @sender.send(hash.to_json + "\0")
        else
          @sender.send_datagrams(datagrams_from_hash(hash))
        end
      end
    end

    def extract_hash(object = nil, args = {})
      primary_data = if object.respond_to?(:to_hash)
                       object.to_hash
                     elsif object.is_a?(Exception)
                       args['level'] ||= GELF::ERROR
                       self.class.extract_hash_from_exception(object)
                     else
                       args['level'] ||= GELF::INFO
                       { 'short_message' => object.to_s }
                     end

      hash = default_options.merge(self.class.stringify_keys(args.merge(primary_data)))
      convert_hoptoad_keys_to_graylog2(hash)
      set_file_and_line(hash) if @collect_file_and_line
      set_timestamp(hash)
      check_presence_of_mandatory_attributes(hash)
      hash
    end

    def self.extract_hash_from_exception(exception)
      bt = exception.backtrace || ["Backtrace is not available."]
      {
        'short_message' => "#{exception.class}: #{exception.message}",
        'full_message' => "Backtrace:\n" + bt.join("\n")
      }
    end

    # Converts Hoptoad-specific keys in +@hash+ to Graylog2-specific.
    def convert_hoptoad_keys_to_graylog2(hash)
      if hash['short_message'].to_s.empty?
        if hash.has_key?('error_class') && hash.has_key?('error_message')
          hash['short_message'] = hash.delete('error_class') + ': ' + hash.delete('error_message')
        end
      end
    end

    CALLER_REGEXP = /^(.*):(\d+).*/
    LIB_GELF_PATTERN = File.join('lib', 'gelf')

    def set_file_and_line(hash)
      stack = caller
      frame = stack.find { |f| !f.include?(LIB_GELF_PATTERN) }
      match = CALLER_REGEXP.match(frame)
      hash['file'] = match[1]
      hash['line'] = match[2].to_i
    end

    def set_timestamp(hash)
      hash['timestamp'] = Time.now.utc.to_f if hash['timestamp'].nil?
    end

    def check_presence_of_mandatory_attributes(hash)
      %w(version short_message host).each do |attribute|
        if hash[attribute].to_s.empty?
          raise ArgumentError.new("#{attribute} is missing. Options version, short_message and host must be set.")
        end
      end
    end

    def datagrams_from_hash(hash)
      data = serialize_hash(hash)
      datagrams = []

      # Maximum total size is 8192 byte for UDP datagram. Split to chunks if bigger. (GELF v1.0 supports chunking)
      if data.count > @max_chunk_size
        id = @random.bytes(8)
        msg_id = Digest::MD5.digest("#{Time.now.to_f}-#{id}")[0, 8]
        num, count = 0, (data.count.to_f / @max_chunk_size).ceil
        if count > MAX_CHUNKS
          raise ArgumentError, "Data too big (#{data.count} bytes), would create more than #{MAX_CHUNKS} chunks!"
        end
        data.each_slice(@max_chunk_size) do |slice|
          datagrams << "\x1e\x0f" + msg_id + [num, count, *slice].pack('C*')
          num += 1
        end
      else
        datagrams << data.to_a.pack('C*')
      end

      datagrams
    end

    def validate_hash(hash)
      raise ArgumentError.new("Hash is empty.") if hash.nil? || hash.empty?
      hash['level'] = @level_mapping[hash['level']]
    end

    def serialize_hash(hash)
      validate_hash(hash)

      Zlib::Deflate.deflate(hash.to_json).bytes
    end

    def self.stringify_keys(data)
      return data unless data.is_a? Hash

      data.each_with_object({}) do |(key, value), obj|
        key_s = key.to_s

        if (key != key_s) && data.key?(key_s)
          raise ArgumentError, "Both #{key.inspect} and #{key_s} are present."
        end

        obj[key_s] = value
      end
    end
  end
end