Class: Concurrent::Actor::Utils::Broadcast

Inherits:
RestartingContext show all
Defined in:
lib/concurrent/actor/utils/broadcast.rb

Overview

Allows to build pub/sub easily.

Examples:

news

news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news

2.times do |i|
  Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
    news_channel << :subscribe
    -> message { puts message }
  end
end

news_channel << 'Ruby rocks!'
# prints: 'Ruby rocks!' twice

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Broadcast) initialize

Returns a new instance of Broadcast



22
23
24
# File 'lib/concurrent/actor/utils/broadcast.rb', line 22

def initialize
  @receivers = Set.new
end

Instance Attribute Details

- (undocumented) core (readonly) Originally defined in class AbstractContext

Instance Method Details

- (undocumented) filtered_receivers

override to define different behaviour, filtering etc



45
46
47
# File 'lib/concurrent/actor/utils/broadcast.rb', line 45

def filtered_receivers
  @receivers
end

- (undocumented) on_message(message)



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/concurrent/actor/utils/broadcast.rb', line 26

def on_message(message)
  case message
  when :subscribe
    if envelope.sender.is_a? Reference
      @receivers.add envelope.sender
      true
    else
      false
    end
  when :unsubscribe
    !!@receivers.delete(envelope.sender)
  when :subscribed?
    @receivers.include? envelope.sender
  else
    filtered_receivers.each { |r| r << message }
  end
end