1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
require 'set'
module Concurrent
module Actor
module Utils
# Allows to build pub/sub easily.
# @example news
# news_channel = Concurrent::Actor::Utils::Broadcast.spawn :news
#
# 2.times do |i|
# Concurrent::Actor::Utils::AdHoc.spawn "listener-#{i}" do
# news_channel << :subscribe
# -> message { puts message }
# end
# end
#
# news_channel << 'Ruby rocks!'
# # prints: 'Ruby rocks!' twice
class Broadcast < RestartingContext
def initialize
@receivers = Set.new
end
def on_message(message)
case message
when :subscribe
if envelope.sender.is_a? Reference
@receivers.add envelope.sender
true
else
false
end
when :unsubscribe
!!@receivers.delete(envelope.sender)
when :subscribed?
@receivers.include? envelope.sender
else
filtered_receivers.each { |r| r << message }
end
end
# override to define different behaviour, filtering etc
def filtered_receivers
@receivers
end
end
end
end
end
|