1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
# frozen_string_literal: true
module Sentry
module Metrics
class Aggregator < ThreadedPeriodicWorker
FLUSH_INTERVAL = 5
ROLLUP_IN_SECONDS = 10
# this is how far removed from user code in the backtrace we are
# when we record code locations
DEFAULT_STACKLEVEL = 4
KEY_SANITIZATION_REGEX = /[^a-zA-Z0-9_\-.]+/
UNIT_SANITIZATION_REGEX = /[^a-zA-Z0-9_]+/
TAG_KEY_SANITIZATION_REGEX = /[^a-zA-Z0-9_\-.\/]+/
TAG_VALUE_SANITIZATION_MAP = {
"\n" => "\\n",
"\r" => "\\r",
"\t" => "\\t",
"\\" => "\\\\",
"|" => "\\u{7c}",
"," => "\\u{2c}"
}
METRIC_TYPES = {
c: CounterMetric,
d: DistributionMetric,
g: GaugeMetric,
s: SetMetric
}
# exposed only for testing
attr_reader :client, :thread, :buckets, :flush_shift, :code_locations
def initialize(configuration, client)
super(configuration.sdk_logger, FLUSH_INTERVAL)
@client = client
@before_emit = configuration.metrics.before_emit
@enable_code_locations = configuration.metrics.enable_code_locations
@stacktrace_builder = configuration.stacktrace_builder
@default_tags = {}
@default_tags["release"] = configuration.release if configuration.release
@default_tags["environment"] = configuration.environment if configuration.environment
@mutex = Mutex.new
# a nested hash of timestamp -> bucket keys -> Metric instance
@buckets = {}
# the flush interval needs to be shifted once per startup to create jittering
@flush_shift = Random.rand * ROLLUP_IN_SECONDS
# a nested hash of timestamp (start of day) -> meta keys -> frame
@code_locations = {}
end
def add(type,
key,
value,
unit: "none",
tags: {},
timestamp: nil,
stacklevel: nil)
return unless ensure_thread
return unless METRIC_TYPES.keys.include?(type)
updated_tags = get_updated_tags(tags)
return if @before_emit && !@before_emit.call(key, updated_tags)
timestamp ||= Sentry.utc_now
# this is integer division and thus takes the floor of the division
# and buckets into 10 second intervals
bucket_timestamp = (timestamp.to_i / ROLLUP_IN_SECONDS) * ROLLUP_IN_SECONDS
serialized_tags = serialize_tags(updated_tags)
bucket_key = [type, key, unit, serialized_tags]
added = @mutex.synchronize do
record_code_location(type, key, unit, timestamp, stacklevel: stacklevel) if @enable_code_locations
process_bucket(bucket_timestamp, bucket_key, type, value)
end
# for sets, we pass on if there was a new entry to the local gauge
local_value = type == :s ? added : value
process_span_aggregator(bucket_key, local_value)
end
def flush(force: false)
flushable_buckets = get_flushable_buckets!(force)
code_locations = get_code_locations!
return if flushable_buckets.empty? && code_locations.empty?
envelope = Envelope.new
unless flushable_buckets.empty?
payload = serialize_buckets(flushable_buckets)
envelope.add_item(
{ type: "statsd", length: payload.bytesize },
payload
)
end
unless code_locations.empty?
code_locations.each do |timestamp, locations|
payload = serialize_locations(timestamp, locations)
envelope.add_item(
{ type: "metric_meta", content_type: "application/json" },
payload
)
end
end
@client.capture_envelope(envelope)
end
alias_method :run, :flush
private
# important to sort for key consistency
def serialize_tags(tags)
tags.flat_map do |k, v|
if v.is_a?(Array)
v.map { |x| [k.to_s, x.to_s] }
else
[[k.to_s, v.to_s]]
end
end.sort
end
def get_flushable_buckets!(force)
@mutex.synchronize do
flushable_buckets = {}
if force
flushable_buckets = @buckets
@buckets = {}
else
cutoff = Sentry.utc_now.to_i - ROLLUP_IN_SECONDS - @flush_shift
flushable_buckets = @buckets.select { |k, _| k <= cutoff }
@buckets.reject! { |k, _| k <= cutoff }
end
flushable_buckets
end
end
def get_code_locations!
@mutex.synchronize do
code_locations = @code_locations
@code_locations = {}
code_locations
end
end
# serialize buckets to statsd format
def serialize_buckets(buckets)
buckets.map do |timestamp, timestamp_buckets|
timestamp_buckets.map do |metric_key, metric|
type, key, unit, tags = metric_key
values = metric.serialize.join(":")
sanitized_tags = tags.map { |k, v| "#{sanitize_tag_key(k)}:#{sanitize_tag_value(v)}" }.join(",")
"#{sanitize_key(key)}@#{sanitize_unit(unit)}:#{values}|#{type}|\##{sanitized_tags}|T#{timestamp}"
end
end.flatten.join("\n")
end
def serialize_locations(timestamp, locations)
mapping = locations.map do |meta_key, location|
type, key, unit = meta_key
mri = "#{type}:#{sanitize_key(key)}@#{sanitize_unit(unit)}"
# note this needs to be an array but it really doesn't serve a purpose right now
[mri, [location.merge(type: "location")]]
end.to_h
{ timestamp: timestamp, mapping: mapping }
end
def sanitize_key(key)
key.gsub(KEY_SANITIZATION_REGEX, "_")
end
def sanitize_unit(unit)
unit.gsub(UNIT_SANITIZATION_REGEX, "")
end
def sanitize_tag_key(key)
key.gsub(TAG_KEY_SANITIZATION_REGEX, "")
end
def sanitize_tag_value(value)
value.chars.map { |c| TAG_VALUE_SANITIZATION_MAP[c] || c }.join
end
def get_transaction_name
scope = Sentry.get_current_scope
return nil unless scope && scope.transaction_name
return nil if scope.transaction_source_low_quality?
scope.transaction_name
end
def get_updated_tags(tags)
updated_tags = @default_tags.merge(tags)
transaction_name = get_transaction_name
updated_tags["transaction"] = transaction_name if transaction_name
updated_tags
end
def process_span_aggregator(key, value)
scope = Sentry.get_current_scope
return nil unless scope && scope.span
return nil if scope.transaction_source_low_quality?
scope.span.metrics_local_aggregator.add(key, value)
end
def process_bucket(timestamp, key, type, value)
@buckets[timestamp] ||= {}
if (metric = @buckets[timestamp][key])
old_weight = metric.weight
metric.add(value)
metric.weight - old_weight
else
metric = METRIC_TYPES[type].new(value)
@buckets[timestamp][key] = metric
metric.weight
end
end
def record_code_location(type, key, unit, timestamp, stacklevel: nil)
meta_key = [type, key, unit]
start_of_day = Time.utc(timestamp.year, timestamp.month, timestamp.day).to_i
@code_locations[start_of_day] ||= {}
@code_locations[start_of_day][meta_key] ||= @stacktrace_builder.metrics_code_location(caller[stacklevel || DEFAULT_STACKLEVEL])
end
end
end
end
|