1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
module Celluloid
class Task
# Tasks with a Thread backend
class Threaded < Task
# Run the given block within a task
def initialize(type, meta)
@resume_queue = Queue.new
@exception_queue = Queue.new
@yield_mutex = Mutex.new
@yield_cond = ConditionVariable.new
@thread = nil
super
end
def create
# TODO: move this to ActorSystem#get_thread (ThreadHandle inside Group::Pool)
thread = Internals::ThreadHandle.new(Thread.current[:celluloid_actor_system], :task) do
begin
ex = @resume_queue.pop
raise ex if ex.is_a?(TaskTerminated)
yield
rescue ::Exception => ex
@exception_queue << ex
ensure
@yield_mutex.synchronize do
@yield_cond.signal
end
end
end
@thread = thread
end
def signal
@yield_mutex.synchronize do
@yield_cond.signal
end
@resume_queue.pop
end
def deliver(value)
raise DeadTaskError, "cannot resume a dead task" unless @thread.alive?
@yield_mutex.synchronize do
@resume_queue.push(value)
@yield_cond.wait(@yield_mutex)
raise @exception_queue.pop until @exception_queue.empty?
end
rescue ThreadError
raise DeadTaskError, "cannot resume a dead task"
end
def backtrace
@thread.backtrace
end
end
end
end
|