File: core.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (220 lines) | stat: -rw-r--r-- 7,925 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
require 'concurrent/concern/logging'
require 'concurrent/executors'

module Concurrent
  module Actor

    require 'set'

    # Core of the actor.
    # @note Whole class should be considered private. An user should use {Context}s and {Reference}s only.
    # @note devel: core should not block on anything, e.g. it cannot wait on children to terminate
    #   that would eat up all threads in task pool and deadlock
    class Core < Synchronization::LockableObject
      include TypeCheck
      include Concern::Logging

      # @!attribute [r] reference
      #   Reference to this actor which can be safely passed around.
      #   @return [Reference]
      # @!attribute [r] name
      #   The name of actor instance, it should be uniq (not enforced). Allows easier orientation
      #   between actor instances.
      #   @return [String]
      # @!attribute [r] path
      #   Path of this actor. It is used for easier orientation and logging.
      #   Path is constructed recursively with: `parent.path + self.name` up to a {Actor.root},
      #   e.g. `/an_actor/its_child`.
      #   @return [String]
      # @!attribute [r] executor
      #   Executor which is used to process messages.
      #   @return [Executor]
      # @!attribute [r] actor_class
      #   A subclass of {AbstractContext} representing Actor's behaviour.
      #   @return [Context]
      attr_reader :reference, :name, :path, :executor, :context_class, :context, :behaviour_definition

      # @option opts [String] name
      # @option opts [Context] actor_class a class to be instantiated defining Actor's behaviour
      # @option opts [Array<Object>] args arguments for actor_class instantiation
      # @option opts [Executor] executor, default is `global_io_executor`
      # @option opts [true, false] link, atomically link the actor to its parent (default: true)
      # @option opts [Class] reference a custom descendant of {Reference} to use
      # @option opts [Array<Array(Behavior::Abstract, Array<Object>)>] behaviour_definition, array of pairs
      #   where each pair is behaviour class and its args, see {Behaviour.basic_behaviour_definition}
      # @option opts [ResolvableFuture, nil] initialized, if present it'll be set or failed after {Context} initialization
      # @option opts [Reference, nil] parent **private api** parent of the actor (the one spawning )
      # @option opts [Proc, nil] logger a proc accepting (level, progname, message = nil, &block) params,
      #   can be used to hook actor instance to any logging system, see {Concurrent::Concern::Logging}
      # @param [Proc] block for class instantiation
      def initialize(opts = {}, &block)
        super(&nil)
        synchronize { ns_initialize(opts, &block) }
      end

      # A parent Actor. When actor is spawned the {Actor.current} becomes its parent.
      # When actor is spawned from a thread outside of an actor ({Actor.current} is nil) {Actor.root} is assigned.
      # @return [Reference, nil]
      def parent
        @parent_core && @parent_core.reference
      end

      # @see AbstractContext#dead_letter_routing
      def dead_letter_routing
        @context.dead_letter_routing
      end

      # @return [Array<Reference>] of children actors
      def children
        guard!
        @children.to_a
      end

      # @api private
      def add_child(child)
        guard!
        Type! child, Reference
        @children.add child
        nil
      end

      # @api private
      def remove_child(child)
        guard!
        Type! child, Reference
        @children.delete child
        nil
      end

      # is executed by Reference scheduling processing of new messages
      # can be called from other alternative Reference implementations
      # @param [Envelope] envelope
      def on_envelope(envelope)
        log(DEBUG) { "is  #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender}" }
        schedule_execution do
          log(DEBUG) { "was #{envelope.future ? 'asked' : 'told'} #{envelope.message.inspect} by #{envelope.sender} - processing" }
          process_envelope envelope
        end
        nil
      end

      # ensures that we are inside of the executor
      def guard!
        unless Actor.current == reference
          raise "can be called only inside actor #{reference} but was #{Actor.current}"
        end
      end

      def log(level, message = nil, &block)
        super level, @path, message, &block
      end

      # Schedules blocks to be executed on executor sequentially,
      # sets Actress.current
      def schedule_execution
        @serialized_execution.post(@executor) do
          synchronize do
            begin
              Thread.current[:__current_actor__] = reference
              yield
            rescue => e
              log FATAL, e
            ensure
              Thread.current[:__current_actor__] = nil
            end
          end
        end

        nil
      end

      def broadcast(public, event)
        log(DEBUG) { "event: #{event.inspect} (#{public ? 'public' : 'private'})" }
        @first_behaviour.on_event(public, event)
      end

      # @param [Class] behaviour_class
      # @return [Behaviour::Abstract, nil] based on behaviour_class
      def behaviour(behaviour_class)
        @behaviours[behaviour_class]
      end

      # @param [Class] behaviour_class
      # @return [Behaviour::Abstract] based on behaviour_class
      # @raise [KeyError] when no behaviour
      def behaviour!(behaviour_class)
        @behaviours.fetch behaviour_class
      end

      # @api private
      def allocate_context
        @context = @context_class.allocate
      end

      # @api private
      def build_context
        @context.send :initialize_core, self
        @context.send :initialize, *@args, &@block
      end

      # @api private
      def process_envelope(envelope)
        @first_behaviour.on_envelope envelope
      end

      private

      def ns_initialize(opts, &block)
        @mailbox              = ::Array.new
        @serialized_execution = SerializedExecution.new
        @children             = Set.new

        @context_class = Child! opts.fetch(:class), AbstractContext
        allocate_context

        @executor = Type! opts.fetch(:executor, @context.default_executor), Concurrent::AbstractExecutorService

        @reference = (Child! opts[:reference_class] || @context.default_reference_class, Reference).new self
        @name      = (Type! opts.fetch(:name), String, Symbol).to_s

        parent       = opts[:parent]
        @parent_core = (Type! parent, Reference, NilClass) && parent.send(:core)
        if @parent_core.nil? && @name != '/'
          raise 'only root has no parent'
        end

        @path   = @parent_core ? File.join(@parent_core.path, @name) : @name
        @logger = opts[:logger]

        @parent_core.add_child reference if @parent_core

        initialize_behaviours opts

        @args       = opts.fetch(:args, [])
        @block      = block
        initialized = Type! opts[:initialized], Promises::ResolvableFuture, NilClass

        schedule_execution do
          begin
            build_context
            initialized.fulfill reference if initialized
            log DEBUG, 'spawned'
          rescue => ex
            log ERROR, ex
            @first_behaviour.terminate!
            initialized.reject ex if initialized
          end
        end
      end

      def initialize_behaviours(opts)
        @behaviour_definition = (Type! opts[:behaviour_definition] || @context.behaviour_definition, ::Array).each do |(behaviour, _)|
          Child! behaviour, Behaviour::Abstract
        end
        @behaviours           = {}
        @first_behaviour      = @behaviour_definition.reverse.
            reduce(nil) { |last, (behaviour, *args)| @behaviours[behaviour] = behaviour.new(self, last, opts, *args) }
      end
    end
  end
end