1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
module Concurrent
module Synchronization
# @!visibility private
# @!macro internal_implementation_note
class RbxLockableObject < AbstractLockableObject
safe_initialization!
def initialize(*defaults)
super(*defaults)
@__Waiters__ = []
@__owner__ = nil
end
protected
def synchronize(&block)
if @__owner__ == Thread.current
yield
else
result = nil
Rubinius.synchronize(self) do
begin
@__owner__ = Thread.current
result = yield
ensure
@__owner__ = nil
end
end
result
end
end
def ns_wait(timeout = nil)
wchan = Rubinius::Channel.new
begin
@__Waiters__.push wchan
Rubinius.unlock(self)
signaled = wchan.receive_timeout timeout
ensure
Rubinius.lock(self)
if !signaled && !@__Waiters__.delete(wchan)
# we timed out, but got signaled afterwards,
# so pass that signal on to the next waiter
@__Waiters__.shift << true unless @__Waiters__.empty?
end
end
self
end
def ns_signal
@__Waiters__.shift << true unless @__Waiters__.empty?
self
end
def ns_broadcast
@__Waiters__.shift << true until @__Waiters__.empty?
self
end
end
end
end
|