File: observable_thing.rb

package info (click to toggle)
ruby-xmpp4r 0.5.6-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 1,384 kB
  • sloc: ruby: 17,382; xml: 74; sh: 12; makefile: 4
file content (191 lines) | stat: -rw-r--r-- 5,020 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# =XMPP4R - XMPP Library for Ruby
#
# This file's copyright (c) 2009 by Pablo Lorenzzoni <pablo@propus.com.br>
# License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option.
# Website::http://xmpp4r.github.io
#
# This was based on Observable module from Ruby.
module ObservableThing
  require 'xmpp4r/observable/thread_store'

  # Adds an observer for some "thing".
  #
  # thing:: what will be observed.
  # observer:: the observer.
  # func:: the observer method that will be called (default: :update).
  def add_observer(thing, observer, func = :update)
    @things = {} unless defined? @things
    @things[thing] = {} unless ! @things[thing].nil?
    unless observer.respond_to? func
      raise NoMethodError, "observer does not respond to `#{func.to_s}'"
    end
    @things[thing][observer] = func unless @things[thing].include?(observer)
  end

  # Deletes an observer for some "thing".
  #
  # thing:: what has been observed.
  # observer:: the observer.
  def delete_observer(thing, observer)
    @things[thing].delete observer if defined? @things and ! @things[thing].nil?
  end

  # Delete observers for some "thing".
  #
  # thing:: what has been observed (if nil, deletes all observers).
  def delete_observers(thing = nil)
    if thing.nil?
      @things.clear if defined? @things
    else
      @things[thing].clear if defined? @things and ! @things[thing].nil?
    end
  end

  # Count the number of observers for some "thing".
  #
  # thing:: what has been observed (if nil, count all observers).
  def count_observers(thing = nil)
    return 0 if ! defined? @things
    size = 0
    if thing.nil?
      @things.each { |thing, hash|
        size += hash.size
      }
    else
      size = @things[thing].size unless @things[thing].nil?
    end
    size
  end

  # Count the number of notifications for some "thing".
  #
  # thing:: what has been observed.
  def count_notifications(thing)
    return 0 if (! defined?(@things_counter)) or (! @things_counter.include?(thing))
    @things_counter[thing]
  end

  # Change the state of some "thing".
  #
  # thing:: what will have the state changed.
  # state:: the state (default = true).
  def changed(thing, state = true)
    @things_state = {} unless defined? @things_state
    @things_state[thing] = state
  end

  # Check the state of some "thing".
  #
  # thing: what to have its state checked.
  def changed?(thing)
    if defined? @things_state and @things_state[thing]
      true
    else
      false
    end
  end

  # Notify all observers of "thing" about something. This will only be
  # enforced if the state of that "thing" is true. Also, if the observer
  # returns the Symbol :delete_me, it will be deleted after being notified.
  #
  # thing:: what has been observed.
  # args:: notification to be sent to the observers of "thing".
  def notify_observers(thing, *arg)
    if changed?(thing)
      if defined? @things and ! @things[thing].nil?
        @things[thing].each { |observer, func|
          increase_counter(thing)
          @thread_store = ThreadStore.new if ! defined? @thread_store
          @thread_store.add Thread.new {
            if observer.send(func, thing, *arg) == :delete_me
              delete_observer(thing, observer)
            end
          }
        }
      end
      changed(thing, false)
    end
  end

  # Is there pending notifications?
  def pending_notifications?
    return false if ! defined? @thread_store
    @thread_store.size > 0
  end

  # Wait all notifications
  def wait_notifications
    sleep 1 while pending_notifications?
  end

  private
  def increase_counter(thing)
    @things_counter = {} unless defined? @things_counter
    @things_counter[thing] = 0 unless @things_counter.include?(thing)
    @things_counter[thing] += 1
  end
end

# QObserver - simple observer-to-queue class
class QObserver
  require 'thread'

  def initialize
    @queues = Hash.new
  end

  def inspect
    h = {}
    @queues.each do |q, queue|
      h[q] = queue.size
    end
    if h.length > 0
      sprintf("#<%s:0x%x size=%s>", self.class.name, __id__, h.inspect)
    else
      sprintf("#<%s:0x%x>", self.class.name, __id__)
    end
  end  

  # Return the queues we have registered
  def queues
    @queues.keys
  end

  # Received something in this queue?
  #
  # q:: queue
  def received?(q)
    @queues.include?(q) and ! @queues[q].empty?
  end

  # Get the contents of the queue in an array (or pass each item to the
  # given block.
  #
  # q:: queue
  def received(q)
    return nil if ! @queues.include?(q)
    if block_given?
      yield @queues[q].deq while ! @queues[q].empty?
    else
      a = []
      a << @queues[q].deq while ! @queues[q].empty?
      return a
    end
  end

  # Get the size of a given queue
  #
  # q:: queue
  def size(q)
    @queues[q].size rescue 0
  end

  # update method for our Observer
  #
  # thing:: what to be updated
  def update(thing, *args)
    @queues[thing] = Queue.new if ! @queues.include?(thing)
    @queues[thing].enq args
  end
end