File: linking.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (83 lines) | stat: -rw-r--r-- 2,661 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
module Concurrent
  module Actor
    module Behaviour
      # TODO track what is linked, clean when :terminated
      #   send :linked/:unlinked messages back to build the array of linked actors

      # Links the actor to other actors and sends actor's events to them,
      # like: `:terminated`, `:paused`, `:resumed`, errors, etc.
      # Linked actor needs to handle those messages.
      #
      #     listener = AdHoc.spawn name: :listener do
      #       lambda do |message|
      #         case message
      #         when Reference
      #           if message.ask!(:linked?)
      #             message << :unlink
      #           else
      #             message << :link
      #           end
      #         else
      #           puts "got event #{message.inspect} from #{envelope.sender}"
      #         end
      #       end
      #     end
      #
      #     an_actor = AdHoc.spawn name: :an_actor, supervise: true, behaviour_definition: Behaviour.restarting_behaviour_definition do
      #       lambda { |message| raise 'failed'}
      #     end
      #
      #     # link the actor
      #     listener.ask(an_actor).wait
      #     an_actor.ask(:fail).wait
      #     # unlink the actor
      #     listener.ask(an_actor).wait
      #     an_actor.ask(:fail).wait
      #     an_actor << :terminate!
      #
      # produces only two events, other events happened after unlinking
      #
      #     got event #<RuntimeError: failed> from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
      #     got event :reset from #<Concurrent::Actor::Reference /an_actor (Concurrent::Actor::Utils::AdHoc)>
      class Linking < Abstract
        def initialize(core, subsequent, core_options)
          super core, subsequent, core_options
          @linked = Set.new
          @linked.add Actor.current if core_options[:link] != false
        end

        def on_envelope(envelope)
          case envelope.message
          when :link
            link envelope.sender
          when :unlink
            unlink envelope.sender
          when :linked?
            @linked.include? envelope.sender
          when :linked
            @linked.to_a
          else
            pass envelope
          end
        end

        def link(ref)
          @linked.add(ref)
          true
        end

        def unlink(ref)
          @linked.delete(ref)
          true
        end

        def on_event(public, event)
          event_name, _ = event
          @linked.each { |a| a << event } if public
          @linked.clear if event_name == :terminated
          super public, event
        end
      end
    end
  end
end