File: termination.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; ansic: 288; makefile: 9; sh: 6
file content (85 lines) | stat: -rw-r--r-- 2,843 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
module Concurrent
  module Actor
    module Behaviour

      # Handles actor termination. Waits until all its children are terminated,
      # can be configured on behaviour initialization.
      # @note Actor rejects envelopes when terminated.
      # @note TODO missing example
      class Termination < Abstract

        # @!attribute [r] terminated
        #   @return [Edge::Event] event which will become set when actor is terminated.
        attr_reader :terminated

        def initialize(core, subsequent, core_options, trapping = false, terminate_children = true)
          super core, subsequent, core_options
          @terminated         = Concurrent::Promises.resolvable_future
          @public_terminated  = @terminated.with_hidden_resolvable
          @trapping           = trapping
          @terminate_children = terminate_children
        end

        # @note Actor rejects envelopes when terminated.
        # @return [true, false] if actor is terminated
        def terminated?
          @terminated.resolved?
        end

        def trapping?
          @trapping
        end

        def trapping=(val)
          @trapping = !!val
        end

        def on_envelope(envelope)
          command, reason = envelope.message
          case command
          when :terminated?
            terminated?
          when :terminate!
            if trapping? && reason != :kill
              pass envelope
            else
              terminate! reason, envelope
            end
          when :termination_event
            @public_terminated
          else
            if terminated?
              reject_envelope envelope
              MESSAGE_PROCESSED
            else
              pass envelope
            end
          end
        end

        # Terminates the actor. Any Envelope received after termination is rejected.
        # Terminates all its children, does not wait until they are terminated.
        def terminate!(reason = nil, envelope = nil)
          return true if terminated?

          self_termination = Concurrent::Promises.resolved_future(reason.nil?, reason.nil? || nil, reason)
          all_terminations = if @terminate_children
                               Concurrent::Promises.zip(*children.map { |ch| ch.ask(:terminate!) }, self_termination)
                             else
                               self_termination
                             end

          all_terminations.chain_resolvable(@terminated)
          if envelope && envelope.future
            all_terminations.chain { |fulfilled, _, t_reason| envelope.future.resolve fulfilled, true, t_reason }
          end

          broadcast(true, [:terminated, reason]) # TODO do not end up in Dead Letter Router
          parent << :remove_child if parent

          MESSAGE_PROCESSED
        end
      end
    end
  end
end