1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
# frozen_string_literal: true
class RedisMigrationWorker
include ApplicationWorker
idempotent!
data_consistency :delayed
feature_category :redis
urgency :throttled
loggable_arguments 0
SCAN_START_STOP = '0'
def perform(job_class_name, cursor, options = {})
migrator = self.class.fetch_migrator!(job_class_name)
scan_size = options[:scan_size] || 1000
deadline = Time.now.utc + 3.minutes
while Time.now.utc < deadline
cursor, keys = migrator.redis.scan(cursor, match: migrator.scan_match_pattern, count: scan_size)
migrator.perform(keys) if keys.any?
sleep(0.01)
break if cursor == SCAN_START_STOP
end
self.class.perform_async(job_class_name, cursor, options) unless cursor == SCAN_START_STOP
end
class << self
def fetch_migrator!(job_class_name)
job_class = "Gitlab::BackgroundMigration::Redis::#{job_class_name}".safe_constantize
raise NotImplementedError, "#{job_class_name} does not exist" if job_class.nil?
job_class.new
end
end
end
|