1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
require 'thread'
class RBXFuture
PENDING = Object.new
def initialize
@Lock = Mutex.new
@Condition = ConditionVariable.new
# reference to a value with volatile semantics
@Value = Rubinius::AtomicReference.new PENDING
# protect against reordering
Rubinius.memory_barrier
end
def complete?(value = @Value.get)
value != PENDING
end
def value
# read only once
value = @Value.get
# check without synchronization
return value if complete? value
# critical section
@Lock.synchronize do
until complete?(value = @Value.get)
# blocks thread until it is broadcasted
@Condition.wait @Lock
end
end
value
end
def fulfill(value)
@Lock.synchronize do
raise 'already fulfilled' if complete?
@Value.set value
@Condition.broadcast
end
self
end
end
|