1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
module Celluloid
# Tasks are interruptable/resumable execution contexts used to run methods
class Task
# Obtain the current task
def self.current
Thread.current[:celluloid_task] || raise(NotTaskError, "not within a task context")
end
# Suspend the running task, deferring to the scheduler
def self.suspend(status)
Task.current.suspend(status)
end
attr_reader :type, :meta, :status
attr_accessor :chain_id, :guard_warnings
# Create a new task
def initialize(type, meta)
@type = type
@meta = meta
@status = :new
@exclusive = false
@dangerous_suspend = @meta ? @meta.dup.delete(:dangerous_suspend) : false
@guard_warnings = false
actor = Thread.current[:celluloid_actor]
@chain_id = Internals::CallChain.current_id
raise NotActorError, "can't create tasks outside of actors" unless actor
guard "can't create tasks inside of tasks" if Thread.current[:celluloid_task]
create do
begin
@status = :running
actor.setup_thread
name_current_thread thread_metadata
Thread.current[:celluloid_task] = self
Internals::CallChain.current_id = @chain_id
actor.tasks << self
yield
rescue TaskTerminated
# Task was explicitly terminated
ensure
name_current_thread nil
@status = :dead
actor.tasks.delete self
end
end
end
def create(&_block)
raise "Implement #{self.class}#create"
end
# Suspend the current task, changing the status to the given argument
def suspend(status)
raise "Cannot suspend while in exclusive mode" if exclusive?
raise "Cannot suspend a task from outside of itself" unless Task.current == self
@status = status
# !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
# rubocop:disable Style/GlobalVars
if $CELLULOID_DEBUG && @dangerous_suspend
Internals::Logger.with_backtrace(caller[2...8]) do |logger|
logger.warn "Dangerously suspending task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
end
end
# rubocop:enable Style/GlobalVars
value = signal
@status = :running
raise value if value.is_a?(Celluloid::Interruption)
value
end
# Resume a suspended task, giving it a value to return if needed
def resume(value = nil)
guard "Cannot resume a task from inside of a task" if Thread.current[:celluloid_task]
if running?
deliver(value)
else
# rubocop:disable Metrics/LineLength
Internals::Logger.warn "Attempted to resume a dead task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
# rubocop:enable Metrics/LineLength
end
nil
end
# Execute a code block in exclusive mode.
def exclusive
if @exclusive
yield
else
begin
@exclusive = true
yield
ensure
@exclusive = false
end
end
end
# Terminate this task
def terminate
raise "Cannot terminate an exclusive task" if exclusive?
if running?
# !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
# rubocop:disable Style/GlobalVars
if $CELLULOID_DEBUG
Internals::Logger.with_backtrace(backtrace) do |logger|
type = @dangerous_suspend ? :warn : :debug
logger.send(type, "Terminating task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}")
end
end
# rubocop:enable Style/GlobalVars
exception = TaskTerminated.new("task was terminated")
exception.set_backtrace(caller)
resume exception
else
raise DeadTaskError, "task is already dead"
end
end
# Is this task running in exclusive mode?
def exclusive?
@exclusive
end
def backtrace; end
# Is the current task still running?
def running?
@status != :dead
end
# Nicer string inspect for tasks
def inspect
"#<#{self.class}:0x#{object_id.to_s(16)} @type=#{@type.inspect}, @meta=#{@meta.inspect}, @status=#{@status.inspect}>"
end
def guard(message)
# !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
# rubocop:disable Style/GlobalVars
if @guard_warnings
Internals::Logger.warn message if $CELLULOID_DEBUG
else
raise message if $CELLULOID_DEBUG
end
# rubocop:enable Style/GlobalVars
end
private
def name_current_thread(new_name)
return unless RUBY_PLATFORM == "java"
if new_name.nil?
new_name = Thread.current[:celluloid_original_thread_name]
Thread.current[:celluloid_original_thread_name] = nil
else
Thread.current[:celluloid_original_thread_name] = Thread.current.to_java.getNativeThread.get_name
end
Thread.current.to_java.getNativeThread.set_name(new_name)
end
def thread_metadata
method = @meta && @meta[:method_name] || "<no method>"
klass = Thread.current[:celluloid_actor] &&
Thread.current[:celluloid_actor].behavior.subject.bare_object.class ||
"<no actor>"
format("[Celluloid] %s#%s", klass, method)
end
end
end
|