1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
# frozen_string_literal: true
module Sentry
class SessionFlusher
include LoggingHelper
FLUSH_INTERVAL = 60
def initialize(configuration, client)
@thread = nil
@client = client
@pending_aggregates = {}
@release = configuration.release
@environment = configuration.environment
@logger = configuration.logger
log_debug("[Sessions] Sessions won't be captured without a valid release") unless @release
end
def flush
return if @pending_aggregates.empty?
envelope = pending_envelope
Sentry.background_worker.perform do
@client.transport.send_envelope(envelope)
end
@pending_aggregates = {}
end
def add_session(session)
return unless @release
ensure_thread
return unless Session::AGGREGATE_STATUSES.include?(session.status)
@pending_aggregates[session.aggregation_key] ||= init_aggregates(session.aggregation_key)
@pending_aggregates[session.aggregation_key][session.status] += 1
end
def kill
log_debug("Killing session flusher")
@thread&.kill
end
private
def init_aggregates(aggregation_key)
aggregates = { started: aggregation_key.iso8601 }
Session::AGGREGATE_STATUSES.each { |k| aggregates[k] = 0 }
aggregates
end
def pending_envelope
envelope = Envelope.new
header = { type: 'sessions' }
payload = { attrs: attrs, aggregates: @pending_aggregates.values }
envelope.add_item(header, payload)
envelope
end
def attrs
{ release: @release, environment: @environment }
end
def ensure_thread
return if @thread&.alive?
@thread = Thread.new do
loop do
sleep(FLUSH_INTERVAL)
flush
end
end
end
end
end
|