File: entry_parser.rb

package info (click to toggle)
ruby-prometheus-client-mmap 1.2.9-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 700 kB
  • sloc: ruby: 3,149; sh: 54; makefile: 21
file content (132 lines) | stat: -rw-r--r-- 4,190 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
require 'prometheus/client/helper/json_parser'

module Prometheus
  module Client
    module Helper
      module EntryParser
        class ParsingError < RuntimeError;
        end

        MINIMUM_SIZE = 8
        START_POSITION = 8
        VALUE_BYTES = 8
        ENCODED_LENGTH_BYTES = 4

        def used
          slice(0..3).unpack('l')[0]
        end

        def parts
          @parts ||= File.basename(filepath, '.db')
                       .split('_')
                       .map { |e| e.gsub(/-\d+$/, '') } # remove trailing -number
        end

        def type
          parts[0].to_sym
        end

        def pid
          (parts[2..-1] || []).join('_')
        end

        def multiprocess_mode
          parts[1]
        end

        def empty?
          size < MINIMUM_SIZE || used.zero?
        end

        def entries(ignore_errors = false)
          return Enumerator.new {} if empty?

          Enumerator.new do |yielder|
            used_ = used # cache used to avoid unnecessary unpack operations

            pos = START_POSITION # used + padding offset
            while pos < used_ && pos < size && pos > 0
              data = slice(pos..-1)
              unless data
                raise ParsingError, "data slice is nil at pos #{pos}" unless ignore_errors
                pos += 8
                next
              end

              encoded_len, first_encoded_bytes = data.unpack('LL')
              if encoded_len.nil? || encoded_len.zero? || first_encoded_bytes.nil? || first_encoded_bytes.zero?
                # do not parse empty data
                pos += 8
                next
              end

              entry_len = ENCODED_LENGTH_BYTES + encoded_len
              padding_len = 8 - entry_len % 8

              value_offset = entry_len + padding_len # align to 8 bytes
              pos += value_offset

              if value_offset > 0 && (pos + VALUE_BYTES) <= size # if positions are safe
                yielder.yield data, encoded_len, value_offset, pos
              else
                raise ParsingError, "data slice is nil at pos #{pos}" unless ignore_errors
              end
              pos += VALUE_BYTES
            end
          end
        end

        def parsed_entries(ignore_errors = false)
          result = entries(ignore_errors).map do |data, encoded_len, value_offset, _|
            begin
              encoded, value = data.unpack(format('@4A%d@%dd', encoded_len, value_offset))
              [encoded, value]
            rescue ArgumentError => e
              Prometheus::Client.logger.debug("Error processing data: #{bin_to_hex(data[0, 7])} len: #{encoded_len} value_offset: #{value_offset}")
              raise ParsingError, e unless ignore_errors
            end
          end

          result.reject!(&:nil?) if ignore_errors
          result
        end

        def to_metrics(metrics = {}, ignore_errors = false)
          parsed_entries(ignore_errors).each do |key, value|
            begin
              metric_name, name, labelnames, labelvalues = JsonParser.load(key)
              labelnames ||= []
              labelvalues ||= []

              metric = metrics.fetch(metric_name,
                                     metric_name: metric_name,
                                     help: 'Multiprocess metric',
                                     type: type,
                                     samples: [])
              if type == :gauge
                metric[:multiprocess_mode] = multiprocess_mode
                metric[:samples] += [[name, labelnames.zip(labelvalues) + [['pid', pid]], value]]
              else
                # The duplicates and labels are fixed in the next for.
                metric[:samples] += [[name, labelnames.zip(labelvalues), value]]
              end
              metrics[metric_name] = metric

            rescue JSON::ParserError => e
              raise ParsingError(e) unless ignore_errors
            end
          end

          metrics.reject! { |e| e.nil? } if ignore_errors
          metrics
        end

        private

        def bin_to_hex(s)
          s.each_byte.map { |b| b.to_s(16) }.join
        end
      end
    end
  end
end