1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
# frozen_string_literal: true
require "sentry/threaded_periodic_worker"
require "sentry/envelope"
module Sentry
# TelemetryEventBuffer is a base class for buffering telemetry events (logs, metrics, etc.)
# and sending them to Sentry in a single envelope.
#
# This is used internally by the `Sentry::Client`.
#
# @!visibility private
class TelemetryEventBuffer < ThreadedPeriodicWorker
FLUSH_INTERVAL = 5 # seconds
# @!visibility private
attr_reader :pending_items, :envelope_type, :data_category, :thread
def initialize(configuration, client, event_class:, max_items:, max_items_before_drop:, envelope_type:, envelope_content_type:, before_send:)
super(configuration.sdk_logger, FLUSH_INTERVAL)
@client = client
@dsn = configuration.dsn
@debug = configuration.debug
@event_class = event_class
@max_items = max_items
@max_items_before_drop = max_items_before_drop
@envelope_type = envelope_type
@data_category = Sentry::Envelope::Item.data_category(@envelope_type)
@envelope_content_type = envelope_content_type
@before_send = before_send
@pending_items = []
@mutex = Mutex.new
log_debug("[#{self.class}] Initialized buffer with max_items=#{@max_items}, flush_interval=#{FLUSH_INTERVAL}s")
end
def flush
@mutex.synchronize do
return if empty?
log_debug("[#{self.class}] flushing #{size} #{@event_class}")
send_items
end
self
end
alias_method :run, :flush
def add_item(item)
@mutex.synchronize do
return unless ensure_thread
if size >= @max_items_before_drop
log_debug("[#{self.class}] exceeded max capacity, dropping event")
@client.transport.record_lost_event(:queue_overflow, @data_category)
else
@pending_items << item
end
send_items if size >= @max_items
end
self
end
def empty?
@pending_items.empty?
end
def size
@pending_items.size
end
def clear!
@pending_items.clear
end
private
def send_items
envelope = Envelope.new(
event_id: Sentry::Utils.uuid,
sent_at: Sentry.utc_now.iso8601,
dsn: @dsn,
sdk: Sentry.sdk_meta
)
discarded_count = 0
envelope_items = []
if @before_send
@pending_items.each do |item|
processed_item = @before_send.call(item)
if processed_item
envelope_items << processed_item.to_h
else
discarded_count += 1
end
end
else
envelope_items = @pending_items.map(&:to_h)
end
unless discarded_count.zero?
@client.transport.record_lost_event(:before_send, @data_category, num: discarded_count)
end
return if envelope_items.empty?
envelope.add_item(
{
type: @envelope_type,
item_count: envelope_items.size,
content_type: @envelope_content_type
},
{ items: envelope_items }
)
@client.send_envelope(envelope)
rescue => e
log_error("[#{self.class}] Failed to send #{@event_class}", e, debug: @debug)
ensure
clear!
end
end
end
|