1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
# frozen_string_literal: true
module Sentry
class SessionFlusher < ThreadedPeriodicWorker
FLUSH_INTERVAL = 60
def initialize(configuration, client)
super(configuration.logger, FLUSH_INTERVAL)
@client = client
@pending_aggregates = {}
@release = configuration.release
@environment = configuration.environment
log_debug("[Sessions] Sessions won't be captured without a valid release") unless @release
end
def flush
return if @pending_aggregates.empty?
@client.capture_envelope(pending_envelope)
@pending_aggregates = {}
end
alias_method :run, :flush
def add_session(session)
return unless @release
return unless ensure_thread
return unless Session::AGGREGATE_STATUSES.include?(session.status)
@pending_aggregates[session.aggregation_key] ||= init_aggregates(session.aggregation_key)
@pending_aggregates[session.aggregation_key][session.status] += 1
end
private
def init_aggregates(aggregation_key)
aggregates = { started: aggregation_key.iso8601 }
Session::AGGREGATE_STATUSES.each { |k| aggregates[k] = 0 }
aggregates
end
def pending_envelope
envelope = Envelope.new
header = { type: 'sessions' }
payload = { attrs: attrs, aggregates: @pending_aggregates.values }
envelope.add_item(header, payload)
envelope
end
def attrs
{ release: @release, environment: @environment }
end
end
end
|