1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
class Rollout
module Logging
def self.extended(rollout)
options = rollout.options[:logging]
options = options.is_a?(Hash) ? options.dup : {}
options[:storage] ||= rollout.storage
logger = Logger.new(**options)
rollout.add_observer(logger, :log)
rollout.define_singleton_method(:logging) do
logger
end
end
class Event
attr_reader :feature, :name, :data, :context, :created_at
def self.from_raw(value, score)
hash = JSON.parse(value, symbolize_names: true)
new(**hash.merge(created_at: Time.at(-score.to_f / 1_000_000)))
end
def initialize(feature: nil, name:, data:, context: {}, created_at:)
@feature = feature
@name = name
@data = data
@context = context
@created_at = created_at
end
def timestamp
(@created_at.to_f * 1_000_000).to_i
end
def serialize
JSON.dump(
feature: @feature,
name: @name,
data: @data,
context: @context,
)
end
def ==(other)
feature == other.feature \
&& name == other.name \
&& data == other.data \
&& created_at == other.created_at
end
end
class Logger
def initialize(storage: nil, history_length: 50, global: false)
@history_length = history_length
@storage = storage
@global = global
end
def updated_at(feature_name)
storage_key = events_storage_key(feature_name)
_, score = @storage.zrange(storage_key, 0, 0, with_scores: true).first
Time.at(-score.to_f / 1_000_000) if score
end
def last_event(feature_name)
storage_key = events_storage_key(feature_name)
value = @storage.zrange(storage_key, 0, 0, with_scores: true).first
Event.from_raw(*value) if value
end
def events(feature_name)
storage_key = events_storage_key(feature_name)
@storage
.zrange(storage_key, 0, -1, with_scores: true)
.map { |v| Event.from_raw(*v) }
.reverse
end
def global_events
@storage
.zrange(global_events_storage_key, 0, -1, with_scores: true)
.map { |v| Event.from_raw(*v) }
.reverse
end
def delete(feature_name)
storage_key = events_storage_key(feature_name)
@storage.del(storage_key)
end
def update(before, after)
before_hash = before.to_hash
before_hash.delete(:data).each do |k, v|
before_hash["data.#{k}"] = v
end
after_hash = after.to_hash
after_hash.delete(:data).each do |k, v|
after_hash["data.#{k}"] = v
end
keys = before_hash.keys | after_hash.keys
change = { before: {}, after: {} }
changed_count = 0
keys.each do |key|
next if before_hash[key] == after_hash[key]
change[:before][key] = before_hash[key]
change[:after][key] = after_hash[key]
changed_count += 1
end
return if changed_count == 0
event = Event.new(
feature: after.name,
name: :update,
data: change,
context: current_context,
created_at: Time.now,
)
storage_key = events_storage_key(after.name)
@storage.zadd(storage_key, -event.timestamp, event.serialize)
@storage.zremrangebyrank(storage_key, @history_length, -1)
if @global
@storage.zadd(global_events_storage_key, -event.timestamp, event.serialize)
@storage.zremrangebyrank(global_events_storage_key, @history_length, -1)
end
end
def log(event, *args)
return unless logging_enabled?
unless respond_to?(event)
raise ArgumentError, "Invalid log event: #{event}"
end
expected_arity = method(event).arity
unless args.count == expected_arity
raise(
ArgumentError,
"Invalid number of arguments for event '#{event}': expected #{expected_arity} but got #{args.count}",
)
end
public_send(event, *args)
end
CONTEXT_THREAD_KEY = :rollout_logging_context
WITHOUT_THREAD_KEY = :rollout_logging_disabled
def with_context(context)
raise ArgumentError, "context must be a Hash" unless context.is_a?(Hash)
raise ArgumentError, "block is required" unless block_given?
Thread.current[CONTEXT_THREAD_KEY] = context
yield
ensure
Thread.current[CONTEXT_THREAD_KEY] = nil
end
def current_context
Thread.current[CONTEXT_THREAD_KEY] || {}
end
def without
Thread.current[WITHOUT_THREAD_KEY] = true
yield
ensure
Thread.current[WITHOUT_THREAD_KEY] = nil
end
def logging_enabled?
!Thread.current[WITHOUT_THREAD_KEY]
end
private
def global_events_storage_key
"feature:_global_:logging:events"
end
def events_storage_key(feature_name)
"feature:#{feature_name}:logging:events"
end
def current_timestamp
(Time.now.to_f * 1_000_000).to_i
end
end
end
end
|