1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
require 'concurrent_needed'
require 'concurrent/synchronization'
require 'concurrent/atomics'
class CRFuture < Concurrent::Synchronization::Object
PENDING = Object.new
safe_initialization!
# also defines reader and writer methods
private *attr_volatile(:volatile_value)
def initialize
super
@Lock = Mutex.new
@Condition = ConditionVariable.new
self.volatile_value = PENDING
end
def complete?(value = volatile_value)
value != PENDING
end
def value
# read only once
value = volatile_value
return value if complete? value
# critical section
@Lock.synchronize do
until complete?(value = volatile_value)
# blocks thread until it is broadcasted
@Condition.wait @Lock
end
end
value
end
def fulfill(value)
@Lock.synchronize do
raise 'already fulfilled' if complete?
self.volatile_value = value
@Condition.broadcast
end
self
end
end
|