1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
|
# frozen_string_literal: true
require 'zlib'
module WithAdvisoryLock
class Result
attr_reader :result
def initialize(lock_was_acquired, result = false)
@lock_was_acquired = lock_was_acquired
@result = result
end
def lock_was_acquired?
@lock_was_acquired
end
end
FAILED_TO_LOCK = Result.new(false)
LockStackItem = Struct.new(:name, :shared)
class Base
attr_reader :connection, :lock_name, :timeout_seconds, :shared, :transaction, :disable_query_cache
def initialize(connection, lock_name, options)
options = { timeout_seconds: options } unless options.respond_to?(:fetch)
options.assert_valid_keys :timeout_seconds, :shared, :transaction, :disable_query_cache
@connection = connection
@lock_name = lock_name
@timeout_seconds = options.fetch(:timeout_seconds, nil)
@shared = options.fetch(:shared, false)
@transaction = options.fetch(:transaction, false)
@disable_query_cache = options.fetch(:disable_query_cache, false)
end
def lock_str
@lock_str ||= "#{ENV[LOCK_PREFIX_ENV]}#{lock_name}"
end
def lock_stack_item
@lock_stack_item ||= LockStackItem.new(lock_str, shared)
end
def self.lock_stack
# access doesn't need to be synchronized as it is only accessed by the current thread.
Thread.current[:with_advisory_lock_stack] ||= []
end
delegate :lock_stack, to: 'self.class'
def already_locked?
lock_stack.include? lock_stack_item
end
def with_advisory_lock_if_needed(&block)
if disable_query_cache
return lock_and_yield do
connection.uncached(&block)
end
end
lock_and_yield(&block)
end
def lock_and_yield(&block)
if already_locked?
Result.new(true, yield)
elsif timeout_seconds == 0
yield_with_lock(&block)
else
yield_with_lock_and_timeout(&block)
end
end
def stable_hashcode(input)
if input.is_a? Numeric
input.to_i
else
# Ruby MRI's String#hash is randomly seeded as of Ruby 1.9 so
# make sure we use a deterministic hash.
Zlib.crc32(input.to_s, 0)
end
end
def yield_with_lock_and_timeout(&block)
give_up_at = Time.now + @timeout_seconds if @timeout_seconds
while @timeout_seconds.nil? || Time.now < give_up_at
r = yield_with_lock(&block)
return r if r.lock_was_acquired?
# Randomizing sleep time may help reduce contention.
sleep(rand(0.05..0.15))
end
FAILED_TO_LOCK
end
def yield_with_lock
if try_lock
begin
lock_stack.push(lock_stack_item)
result = block_given? ? yield : nil
Result.new(true, result)
ensure
lock_stack.pop
release_lock
end
else
FAILED_TO_LOCK
end
end
# Prevent AR from caching results improperly
def unique_column_name
"t#{SecureRandom.hex}"
end
end
end
|