1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
# =XMPP4R - XMPP Library for Ruby
#
# This file's copyright (c) 2009 by Pablo Lorenzzoni <pablo@propus.com.br>
# License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option.
# Website::http://xmpp4r.github.io
#
# This was based on Observable module from Ruby.
module ObservableThing
require 'xmpp4r/observable/thread_store'
# Adds an observer for some "thing".
#
# thing:: what will be observed.
# observer:: the observer.
# func:: the observer method that will be called (default: :update).
def add_observer(thing, observer, func = :update)
@things = {} unless defined? @things
@things[thing] = {} unless ! @things[thing].nil?
unless observer.respond_to? func
raise NoMethodError, "observer does not respond to `#{func.to_s}'"
end
@things[thing][observer] = func unless @things[thing].include?(observer)
end
# Deletes an observer for some "thing".
#
# thing:: what has been observed.
# observer:: the observer.
def delete_observer(thing, observer)
@things[thing].delete observer if defined? @things and ! @things[thing].nil?
end
# Delete observers for some "thing".
#
# thing:: what has been observed (if nil, deletes all observers).
def delete_observers(thing = nil)
if thing.nil?
@things.clear if defined? @things
else
@things[thing].clear if defined? @things and ! @things[thing].nil?
end
end
# Count the number of observers for some "thing".
#
# thing:: what has been observed (if nil, count all observers).
def count_observers(thing = nil)
return 0 if ! defined? @things
size = 0
if thing.nil?
@things.each { |thing, hash|
size += hash.size
}
else
size = @things[thing].size unless @things[thing].nil?
end
size
end
# Count the number of notifications for some "thing".
#
# thing:: what has been observed.
def count_notifications(thing)
return 0 if (! defined?(@things_counter)) or (! @things_counter.include?(thing))
@things_counter[thing]
end
# Change the state of some "thing".
#
# thing:: what will have the state changed.
# state:: the state (default = true).
def changed(thing, state = true)
@things_state = {} unless defined? @things_state
@things_state[thing] = state
end
# Check the state of some "thing".
#
# thing: what to have its state checked.
def changed?(thing)
if defined? @things_state and @things_state[thing]
true
else
false
end
end
# Notify all observers of "thing" about something. This will only be
# enforced if the state of that "thing" is true. Also, if the observer
# returns the Symbol :delete_me, it will be deleted after being notified.
#
# thing:: what has been observed.
# args:: notification to be sent to the observers of "thing".
def notify_observers(thing, *arg)
if changed?(thing)
if defined? @things and ! @things[thing].nil?
@things[thing].each { |observer, func|
increase_counter(thing)
@thread_store = ThreadStore.new if ! defined? @thread_store
@thread_store.add Thread.new {
if observer.send(func, thing, *arg) == :delete_me
delete_observer(thing, observer)
end
}
}
end
changed(thing, false)
end
end
# Is there pending notifications?
def pending_notifications?
return false if ! defined? @thread_store
@thread_store.size > 0
end
# Wait all notifications
def wait_notifications
sleep 1 while pending_notifications?
end
private
def increase_counter(thing)
@things_counter = {} unless defined? @things_counter
@things_counter[thing] = 0 unless @things_counter.include?(thing)
@things_counter[thing] += 1
end
end
# QObserver - simple observer-to-queue class
class QObserver
require 'thread'
def initialize
@queues = Hash.new
end
def inspect
h = {}
@queues.each do |q, queue|
h[q] = queue.size
end
if h.length > 0
sprintf("#<%s:0x%x size=%s>", self.class.name, __id__, h.inspect)
else
sprintf("#<%s:0x%x>", self.class.name, __id__)
end
end
# Return the queues we have registered
def queues
@queues.keys
end
# Received something in this queue?
#
# q:: queue
def received?(q)
@queues.include?(q) and ! @queues[q].empty?
end
# Get the contents of the queue in an array (or pass each item to the
# given block.
#
# q:: queue
def received(q)
return nil if ! @queues.include?(q)
if block_given?
yield @queues[q].deq while ! @queues[q].empty?
else
a = []
a << @queues[q].deq while ! @queues[q].empty?
return a
end
end
# Get the size of a given queue
#
# q:: queue
def size(q)
@queues[q].size rescue 0
end
# update method for our Observer
#
# thing:: what to be updated
def update(thing, *args)
@queues[thing] = Queue.new if ! @queues.include?(thing)
@queues[thing].enq args
end
end
|