1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
require 'concurrent/concern/logging'
module Concurrent
module Actor
# New actor is defined by subclassing {RestartingContext}, {Context} and defining its abstract methods.
# {AbstractContext} can be subclassed directly to implement more specific behaviour see {Root} implementation.
#
# - {Context}
#
# > {include:Actor::Context}
#
# - {RestartingContext}.
#
# > {include:Actor::RestartingContext}
#
# Example of ac actor definition:
#
# {include:file:docs-source/actor/define.out.rb}
#
# See methods of {AbstractContext} what else can be tweaked, e.g {AbstractContext#default_reference_class}
#
# @abstract implement {AbstractContext#on_message} and {AbstractContext#behaviour_definition}
class AbstractContext
include TypeCheck
include InternalDelegations
attr_reader :core
# @abstract override to define Actor's behaviour
# @param [Object] message
# @return [Object] a result which will be used to set the Future supplied to Reference#ask
# @note self should not be returned (or sent to other actors), {#reference} should be used
# instead
def on_message(message)
raise NotImplementedError
end
# override to add custom code invocation on internal events like `:terminated`, `:resumed`, `anError`.
def on_event(event)
end
# @api private
def on_envelope(envelope)
@envelope = envelope
on_message envelope.message
ensure
@envelope = nil
end
# if you want to pass the message to next behaviour, usually
# {Behaviour::ErrorsOnUnknownMessage}
def pass
core.behaviour!(Behaviour::ExecutesContext).pass envelope
end
# Defines an actor responsible for dead letters. Any rejected message send
# with {Reference#tell} is sent there, a message with future is considered
# already monitored for failures. Default behaviour is to use
# {AbstractContext#dead_letter_routing} of the parent, so if no
# {AbstractContext#dead_letter_routing} method is overridden in
# parent-chain the message ends up in `Actor.root.dead_letter_routing`
# agent which will log warning.
# @return [Reference]
def dead_letter_routing
parent.dead_letter_routing
end
# @return [Array<Array(Behavior::Abstract, Array<Object>)>]
def behaviour_definition
raise NotImplementedError
end
# @return [Envelope] current envelope, accessible inside #on_message processing
def envelope
@envelope or raise 'envelope not set'
end
# override if different class for reference is needed
# @return [CLass] descendant of {Reference}
def default_reference_class
Reference
end
# override to se different default executor, e.g. to change it to global_operation_pool
# @return [Executor]
def default_executor
Concurrent.global_io_executor
end
# tell self a message
def tell(message)
reference.tell message
end
def ask(message)
raise 'actor cannot ask itself'
end
alias_method :<<, :tell
alias_method :ask!, :ask
# Behaves as {Concurrent::Actor.spawn} but :class is auto-inserted based on receiver so it can be omitted.
# @example by class and name
# AdHoc.spawn(:ping1) { -> message { message } }
#
# @example by option hash
# inc2 = AdHoc.spawn(name: 'increment by 2',
# args: [2],
# executor: Concurrent.configuration.global_task_pool) do |increment_by|
# lambda { |number| number + increment_by }
# end
# inc2.ask!(2) # => 4
# @see Concurrent::Actor.spawn
def self.spawn(name_or_opts, *args, &block)
Actor.spawn to_spawn_options(name_or_opts, *args), &block
end
# behaves as {Concurrent::Actor.spawn!} but :class is auto-inserted based on receiver so it can be omitted.
def self.spawn!(name_or_opts, *args, &block)
Actor.spawn! to_spawn_options(name_or_opts, *args), &block
end
private
def initialize_core(core)
@core = Type! core, Core
end
def self.to_spawn_options(name_or_opts, *args)
if name_or_opts.is_a? ::Hash
if name_or_opts.key?(:class) && name_or_opts[:class] != self
raise ArgumentError,
':class option is ignored when calling on context class, use Actor.spawn instead'
end
name_or_opts.merge class: self
else
{ class: self, name: name_or_opts, args: args }
end
end
# to avoid confusion with Kernel.spawn
undef_method :spawn
end
# Basic Context of an Actor. It supports only linking and it simply terminates on error.
# Uses {Behaviour.basic_behaviour_definition}:
#
# @abstract implement {AbstractContext#on_message}
class Context < AbstractContext
def behaviour_definition
Behaviour.basic_behaviour_definition
end
end
# Context of an Actor for robust systems. It supports supervision, linking, pauses on error.
# Uses {Behaviour.restarting_behaviour_definition}
#
# @abstract implement {AbstractContext#on_message}
class RestartingContext < AbstractContext
def behaviour_definition
Behaviour.restarting_behaviour_definition
end
end
end
end
|