1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
module Concurrent
module Collection
# @!visibility private
# @!macro ruby_timeout_queue
class RubyTimeoutQueue < ::Queue
def initialize(*args)
if RUBY_VERSION >= '3.2'
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
end
super(*args)
@mutex = Mutex.new
@cond_var = ConditionVariable.new
end
def push(obj)
@mutex.synchronize do
super(obj)
@cond_var.signal
end
end
alias_method :enq, :push
alias_method :<<, :push
def pop(non_block = false, timeout: nil)
if non_block && timeout
raise ArgumentError, "can't set a timeout if non_block is enabled"
end
if non_block
super(true)
elsif timeout
@mutex.synchronize do
deadline = Concurrent.monotonic_time + timeout
while (now = Concurrent.monotonic_time) < deadline && empty?
@cond_var.wait(@mutex, deadline - now)
end
begin
return super(true)
rescue ThreadError
# still empty
nil
end
end
else
super(false)
end
end
alias_method :deq, :pop
alias_method :shift, :pop
end
private_constant :RubyTimeoutQueue
end
end
|