1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
module Celluloid
module Notifications
def self.notifier
Actor[:notifications_fanout] || raise(DeadActorError, "notifications fanout actor not running")
end
def publish(pattern, *args)
Celluloid::Notifications.notifier.publish(pattern, *args)
rescue DeadActorError
# Bad shutdown logic. Oh well....
# TODO: needs a tests
end
module_function :publish
def subscribe(pattern, method)
Celluloid::Notifications.notifier.subscribe(Actor.current, pattern, method)
end
def unsubscribe(*args)
Celluloid::Notifications.notifier.unsubscribe(*args)
end
class Fanout
include Celluloid
trap_exit :prune
def initialize
@subscribers = []
@listeners_for = {}
end
def subscribe(actor, pattern, method)
subscriber = Subscriber.new(actor, pattern, method).tap do |s|
@subscribers << s
end
link actor
@listeners_for.clear
subscriber
end
def unsubscribe(subscriber)
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
def publish(pattern, *args)
listeners_for(pattern).each { |s| s.publish(pattern, *args) }
end
def listeners_for(pattern)
@listeners_for[pattern] ||= @subscribers.select { |s| s.subscribed_to?(pattern) }
end
def listening?(pattern)
listeners_for(pattern).any?
end
def prune(actor, _reason = nil)
@subscribers.reject! { |s| s.actor == actor }
@listeners_for.clear
end
end
class Subscriber
attr_accessor :actor, :pattern, :method
def initialize(actor, pattern, method)
@actor = actor
@pattern = pattern
@method = method
end
def publish(pattern, *args)
actor.async method, pattern, *args
rescue DeadActorError
# TODO: needs a tests
# Bad shutdown logic. Oh well....
end
def subscribed_to?(pattern)
!pattern || @pattern === pattern.to_s || @pattern === pattern
end
def matches?(subscriber_or_pattern)
self === subscriber_or_pattern ||
@pattern && @pattern === subscriber_or_pattern
end
end
end
def self.publish(*args)
Notifications.publish(*args)
end
end
|