1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
|
# frozen_string_literal: true
# Add capabilities to increment a numeric model attribute efficiently by
# using Redis and flushing the increments asynchronously to the database
# after a period of time (10 minutes).
# When an attribute is incremented by a value, the increment is added
# to a Redis key. Then, FlushCounterIncrementsWorker will execute
# `commit_increment!` which removes increments from Redis for a
# given model attribute and updates the values in the database.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :commit_count
# counter_attribute :storage_size
# end
#
# It's possible to define a conditional counter attribute. You need to pass a proc
# that must accept a single argument, the object instance on which this concern is
# included.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :conditional_one, if: -> { |object| object.use_counter_attribute? }
# end
#
# The `counter_attribute` by default will return last persisted value.
# It's possible to always return accurate (real) value instead by using `returns_current: true`.
# While doing this the `counter_attribute` will overwrite attribute accessor to fetch
# the buffered information added to the last persisted value. This will incur cost a Redis call per attribute fetched.
#
# @example:
#
# class ProjectStatistics
# include CounterAttribute
#
# counter_attribute :commit_count, returns_current: true
# end
#
# in that case
# model.commit_count => persisted value + buffered amount to be added
#
# To increment the counter we can use the method:
# increment_amount(:commit_count, 3)
#
# This method would determine whether it would increment the counter using Redis,
# or fallback to legacy increment on ActiveRecord counters.
#
# It is possible to register callbacks to be executed after increments have
# been flushed to the database. Callbacks are not executed if there are no increments
# to flush.
#
# counter_attribute_after_commit do |statistic|
# Namespaces::ScheduleAggregationWorker.perform_async(statistic.namespace_id)
# end
#
module CounterAttribute
extend ActiveSupport::Concern
include AfterCommitQueue
include Gitlab::ExclusiveLeaseHelpers
include Gitlab::Utils::StrongMemoize
class_methods do
def counter_attribute(attribute, if: nil, returns_current: false)
counter_attributes << {
attribute: attribute,
if_proc: binding.local_variable_get(:if), # can't read `if` directly
returns_current: returns_current
}
if returns_current
define_method(attribute) do
current_counter(attribute)
end
end
define_method("increment_#{attribute}") do |amount|
increment_amount(attribute, amount)
end
end
def counter_attributes
@counter_attributes ||= []
end
def after_commit_callbacks
@after_commit_callbacks ||= []
end
# perform registered callbacks after increments have been committed to the database
def counter_attribute_after_commit(&callback)
after_commit_callbacks << callback
end
end
def counter_attribute_enabled?(attribute)
counter_attribute = self.class.counter_attributes.find { |registered| registered[:attribute] == attribute }
return false unless counter_attribute
return true unless counter_attribute[:if_proc]
counter_attribute[:if_proc].call(self)
end
def counter(attribute)
strong_memoize_with(:counter, attribute) do
# This needs #to_sym because attribute could come from a Sidekiq param,
# which would be a string.
build_counter_for(attribute.to_sym)
end
end
def increment_amount(attribute, amount)
counter = Gitlab::Counters::Increment.new(amount: amount)
increment_counter(attribute, counter)
end
def current_counter(attribute)
read_attribute(attribute) + counter(attribute).get
end
def increment_counter(attribute, increment)
return if increment.amount == 0
run_after_commit_or_now do
new_value = counter(attribute).increment(increment)
log_increment_counter(attribute, increment, new_value)
end
end
def bulk_increment_counter(attribute, increments)
run_after_commit_or_now do
new_value = counter(attribute).bulk_increment(increments)
log_bulk_increment_counter(attribute, increments, new_value)
end
end
def update_counters(increments)
self.class.update_counters(id, increments)
end
def update_counters_with_lease(increments)
detect_race_on_record(log_fields: { caller: __method__, attributes: increments.keys }) do
update_counters(increments)
end
end
def initiate_refresh!(attribute)
raise ArgumentError, %(attribute "#{attribute}" cannot be refreshed) unless counter_attribute_enabled?(attribute)
counter(attribute).initiate_refresh!
log_clear_counter(attribute)
end
def finalize_refresh(attribute)
raise ArgumentError, %(attribute "#{attribute}" cannot be refreshed) unless counter_attribute_enabled?(attribute)
counter(attribute).finalize_refresh
end
def execute_after_commit_callbacks
self.class.after_commit_callbacks.each do |callback|
callback.call(self.reset)
end
end
private
def build_counter_for(attribute)
raise ArgumentError, %(attribute "#{attribute}" does not exist) unless has_attribute?(attribute)
return legacy_counter(attribute) unless counter_attribute_enabled?(attribute)
buffered_counter(attribute)
end
def legacy_counter(attribute)
Gitlab::Counters::LegacyCounter.new(self, attribute)
end
def buffered_counter(attribute)
Gitlab::Counters::BufferedCounter.new(self, attribute)
end
def database_lock_key
"project:{#{project_id}}:#{self.class}:#{id}"
end
# This method uses a lease to monitor access to the model row.
# This is needed to detect concurrent attempts to increment columns,
# which could result in a race condition.
#
# As the purpose is to detect and warn concurrent attempts,
# it falls back to direct update on the row if it fails to obtain the lease.
#
# It does not guarantee that there will not be any concurrent updates.
def detect_race_on_record(log_fields: {})
# Ensure attributes is always an array before we log
log_fields[:attributes] = Array(log_fields[:attributes])
Gitlab::AppLogger.info(
message: 'Acquiring lease for project statistics update',
model: self.class.name,
model_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
in_lock(database_lock_key, retries: 0) do
yield
end
rescue Gitlab::ExclusiveLeaseHelpers::FailedToObtainLockError
Gitlab::AppLogger.warn(
message: 'Concurrent project statistics update detected',
model: self.class.name,
model_id: id,
project_id: project.id,
**log_fields,
**Gitlab::ApplicationContext.current
)
yield
end
def log_increment_counter(attribute, increment, new_value)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Increment counter attribute',
attribute: attribute,
project_id: project_id,
increment: increment.amount,
ref: increment.ref,
new_counter_value: new_value,
current_db_value: read_attribute(attribute)
)
Gitlab::AppLogger.info(payload)
end
def log_bulk_increment_counter(attribute, increments, new_value)
if Feature.enabled?(:split_log_bulk_increment_counter, type: :ops)
increments.each do |increment|
log_increment_counter(attribute, increment, new_value)
end
else
log_increment_counter(attribute, Gitlab::Counters::Increment.new(amount: increments.sum(&:amount)), new_value)
end
end
def log_clear_counter(attribute)
payload = Gitlab::ApplicationContext.current.merge(
message: 'Clear counter attribute',
attribute: attribute,
project_id: project_id
)
Gitlab::AppLogger.info(payload)
end
end
|