1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
require 'concurrent_needed'
require 'concurrent/synchronization'
require 'concurrent/atomics'
class CRCasFuture < Concurrent::Synchronization::Object
class Node < Concurrent::Synchronization::Object
attr_volatile(:awake)
safe_initialization!
def initialize(thread)
super()
@Thread = thread
self.awake = false
end
def thread
@Thread
end
end
safe_initialization!
PENDING = Object.new
attr_atomic(:atomic_value)
attr_atomic(:head)
def initialize
super
self.head = nil
self.atomic_value = PENDING
end
def complete?(value = atomic_value)
value != PENDING
end
def value
value = atomic_value
return value if complete? value
begin
while true
head = self.head
node = Node.new Thread.current
break if compare_and_set_head head, node
end
until complete?(value = atomic_value)
# may go to sleep even if completed, but it has a record by then
sleep
end
value
ensure
node.awake = true
wakeup head
end
end
def fulfill(value)
if compare_and_set_atomic_value(PENDING, value)
wakeup head
else
raise 'already fulfilled'
end
self
end
private
def wakeup(node)
return unless node
while true
break if node.awake
# has to be confirmed
node.thread.wakeup
Thread.pass
end
end
end
|