1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
|
require 'prometheus/client'
require 'prometheus/client/mmaped_dict'
require 'json'
module Prometheus
module Client
# A float protected by a mutex backed by a per-process mmaped file.
class MmapedValue
VALUE_LOCK = Mutex.new
@@files = {}
@@pid = -1
def initialize(type, metric_name, name, labels, multiprocess_mode = '')
@file_prefix = type.to_s
@metric_name = metric_name
@name = name
@labels = labels
if type == :gauge
@file_prefix += '_' + multiprocess_mode.to_s
end
@pid = -1
@mutex = Mutex.new
initialize_file
end
def increment(amount = 1)
@mutex.synchronize do
initialize_file if pid_changed?
@value += amount
write_value(@key, @value)
@value
end
end
def decrement(amount = 1)
increment(-amount)
end
def set(value)
@mutex.synchronize do
initialize_file if pid_changed?
@value = value
write_value(@key, @value)
@value
end
end
def get
@mutex.synchronize do
initialize_file if pid_changed?
return @value
end
end
def pid_changed?
@pid != Process.pid
end
# method needs to be run in VALUE_LOCK mutex
def unsafe_reinitialize_file(check_pid = true)
unsafe_initialize_file if !check_pid || pid_changed?
end
def self.reset_and_reinitialize
VALUE_LOCK.synchronize do
@@pid = Process.pid
@@files = {}
ObjectSpace.each_object(MmapedValue).each do |v|
v.unsafe_reinitialize_file(false)
end
end
end
def self.reset_on_pid_change
if pid_changed?
@@pid = Process.pid
@@files = {}
end
end
def self.reinitialize_on_pid_change
VALUE_LOCK.synchronize do
reset_on_pid_change
ObjectSpace.each_object(MmapedValue, &:unsafe_reinitialize_file)
end
end
def self.pid_changed?
@@pid != Process.pid
end
def self.multiprocess
true
end
private
def initialize_file
VALUE_LOCK.synchronize do
unsafe_initialize_file
end
end
def unsafe_initialize_file
self.class.reset_on_pid_change
@pid = Process.pid
unless @@files.has_key?(@file_prefix)
unless @file.nil?
@file.close
end
mmaped_file = Helper::MmapedFile.open_exclusive_file(@file_prefix)
@@files[@file_prefix] = MmapedDict.new(mmaped_file)
end
@file = @@files[@file_prefix]
@key = rebuild_key
@value = read_value(@key)
end
def rebuild_key
keys = @labels.keys.sort
values = @labels.values_at(*keys)
[@metric_name, @name, keys, values].to_json
end
def write_value(key, val)
@file.write_value(key, val)
rescue StandardError => e
Prometheus::Client.logger.warn("writing value to #{@file.path} failed with #{e}")
Prometheus::Client.logger.debug(e.backtrace.join("\n"))
end
def read_value(key)
@file.read_value(key)
rescue StandardError => e
Prometheus::Client.logger.warn("reading value from #{@file.path} failed with #{e}")
Prometheus::Client.logger.debug(e.backtrace.join("\n"))
0
end
end
end
end
|