1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
module Celluloid
# Celluloid::Future objects allow methods and blocks to run in the
# background, their values requested later
class Future
def self.new(*args, &block)
return super unless block
future = new
# task = Thread.current[:celluloid_task]
# actor = Thread.current[:celluloid_actor]
Internals::ThreadHandle.new(Celluloid.actor_system, :future) do
begin
# Thread.current[:celluloid_task] = task
# Thread.current[:celluloid_actor] = actor
call = Call::Sync.new(future, :call, args)
call.dispatch(block)
rescue
# Exceptions in blocks will get raised when the value is retrieved
end
end
future
end
attr_reader :address
def initialize(&block)
@address = Celluloid.uuid
@mutex = Mutex.new
@ready = false
@result = nil
@forwards = nil
@cancelled = false
if block
@call = Call::Sync.new(self, :call, args)
Celluloid.internal_pool.get do
begin
@call.dispatch(block)
rescue
# Exceptions in blocks will get raised when the value is retrieved
end
end
else
@call = nil
end
end
# Execute the given method in future context
def execute(receiver, method, args, block)
@mutex.synchronize do
raise "already calling" if @call
@call = Call::Sync.new(self, method, args, block)
end
receiver << @call
end
# Check if this future has a value yet
def ready?
@ready
end
# Obtain the value for this Future
def value(timeout = nil)
ready = result = nil
begin
@mutex.lock
if @ready
ready = true
result = @result
else
case @forwards
when Array
@forwards << Celluloid.mailbox
when NilClass
@forwards = Celluloid.mailbox
else
@forwards = [@forwards, Celluloid.mailbox]
end
end
ensure
@mutex.unlock
end
unless ready
result = Celluloid.receive(timeout) do |msg|
msg.is_a?(Future::Result) && msg.future == self
end
end
if result
result.respond_to?(:value) ? result.value : result
else
raise TimedOut, "Timed out"
end
end
alias call value
# Signal this future with the given result value
def signal(value)
return if @cancelled
result = Result.new(value, self)
@mutex.synchronize do
raise "the future has already happened!" if @ready
if @forwards
@forwards.is_a?(Array) ? @forwards.each { |f| f << result } : @forwards << result
end
@result = result
@ready = true
end
end
alias << signal
def cancel(error)
response = Internals::Response::Error.new(@call, error)
signal response
@mutex.synchronize do
@cancelled = true
end
end
# Inspect this Celluloid::Future
alias inspect to_s
# Wrapper for result values to distinguish them in mailboxes
class Result
attr_reader :future
def initialize(result, future)
@result = result
@future = future
end
def value
@result.value
end
end
end
end
|