File: stream.rb

package info (click to toggle)
ruby-xmpp4r 0.5.6-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 1,384 kB
  • sloc: ruby: 17,382; xml: 74; sh: 12; makefile: 4
file content (599 lines) | stat: -rw-r--r-- 17,254 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
# =XMPP4R - XMPP Library for Ruby
# License:: Ruby's license (see the LICENSE file) or GNU GPL, at your option.
# Website::http://xmpp4r.github.io

require 'xmpp4r/callbacks'
require 'socket'
require 'thread'
require 'xmpp4r/semaphore'
require 'xmpp4r/streamparser'
require 'xmpp4r/presence'
require 'xmpp4r/message'
require 'xmpp4r/iq'
require 'xmpp4r/debuglog'
require 'xmpp4r/idgenerator'

module Jabber
  ##
  # The stream class manages a connection stream (a file descriptor using which
  # XML messages are read and sent)
  #
  # You may register callbacks for the three Jabber stanzas
  # (message, presence and iq) and use the send and send_with_id
  # methods.
  #
  # To ensure the order of received stanzas, callback blocks are
  # launched in the parser thread. If further blocking operations
  # are intended in those callbacks, run your own thread there.
  class Stream
    DISCONNECTED = 1
    CONNECTED = 2

    # file descriptor used
    attr_reader :fd

    # connection status
    attr_reader :status

    # number of stanzas currently being processed
    attr_reader :processing

    ##
    # Initialize a new stream
    def initialize
      @fd = nil
      @status = DISCONNECTED
      @xmlcbs = CallbackList.new
      @stanzacbs = CallbackList.new
      @messagecbs = CallbackList.new
      @iqcbs = CallbackList.new
      @presencecbs = CallbackList.new
      @send_lock = Mutex.new
      @last_send = Time.now
      @exception_block = nil
      @tbcbmutex = Mutex.new
      @threadblocks = []
      @wakeup_thread = nil
      @streamid = nil
      @streamns = 'jabber:client'
      @features_sem = Semaphore.new
      @parser_thread = nil
      @processing = 0
    end

    ##
    # Start the XML parser on the fd
    def start(fd)
      @stream_mechanisms = []
      @stream_features = {}

      @fd = fd
      @parser = StreamParser.new(@fd, self)
      @parser_thread = Thread.new do
        Thread.current.abort_on_exception = true
        begin
          @parser.parse
          Jabber::debuglog("DISCONNECTED\n")

          if @exception_block
            Thread.new { close!; @exception_block.call(nil, self, :disconnected) }
          else
            close!
          end
        rescue Exception => e
          Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

          if @exception_block
            Thread.new do
              Thread.current.abort_on_exception = true
              close
              @exception_block.call(e, self, :start)
            end
          else
            Jabber::warnlog "Exception caught in Parser thread! (#{e.class})\n#{e.backtrace.join("\n")}"
            close!
            raise
          end
        end
      end

      @status = CONNECTED
    end

    def stop
      @parser_thread.kill
      @parser = nil
    end

    ##
    # Mounts a block to handle exceptions if they occur during the
    # poll send.  This will likely be the first indication that
    # the socket dropped in a Jabber Session.
    #
    # The block has to take three arguments:
    # * the Exception
    # * the Jabber::Stream object (self)
    # * a symbol where it happened, namely :start, :parser, :sending and :end
    def on_exception(&block)
      @exception_block = block
    end

    ##
    # This method is called by the parser when a failure occurs
    def parse_failure(e)
      Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

      # A new thread has to be created because close will cause the thread
      # to commit suicide(???)
      if @exception_block
        # New thread, because close will kill the current thread
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(e, self, :parser)
        end
      else
        Jabber::warnlog "Stream#parse_failure was called by XML parser. Dumping " +
          "backtrace...\n" + e.exception + "\n#{e.backtrace.join("\n")}"
        close
        raise
      end
    end

    ##
    # This method is called by the parser upon receiving <tt></stream:stream></tt>
    def parser_end
      if @exception_block
        Thread.new do
          Thread.current.abort_on_exception = true
          close
          @exception_block.call(nil, self, :close)
        end
      else
        close
      end
    end

    ##
    # Returns if this connection is connected to a Jabber service
    # return:: [Boolean] Connection status
    def is_connected?
      return @status == CONNECTED
    end

    ##
    # Returns if this connection is NOT connected to a Jabber service
    #
    # return:: [Boolean] Connection status
    def is_disconnected?
      return @status == DISCONNECTED
    end

    ##
    # Processes a received REXML::Element and executes
    # registered thread blocks and filters against it.
    #
    # element:: [REXML::Element] The received element
    def receive(element)
      @tbcbmutex.synchronize { @processing += 1 }
      Jabber::debuglog("RECEIVED:\n#{element.to_s}")

      if element.namespace('').to_s == '' # REXML namespaces are always strings
        element.add_namespace(@streamns)
      end

      case element.prefix
      when 'stream'
        case element.name
          when 'stream'
            stanza = element
            @streamid = element.attributes['id']
            @streamns = element.namespace('') if element.namespace('')

            # Hack: component streams are basically client streams.
            # Someday we may want to create special stanza classes
            # for components/s2s deriving from normal stanzas but
            # posessing these namespaces
            @streamns = 'jabber:client' if @streamns == 'jabber:component:accept'

            unless element.attributes['version']  # isn't XMPP compliant, so
              Jabber::debuglog("FEATURES: server not XMPP compliant, will not wait for features")
              @features_sem.run                   # don't wait for <stream:features/>
            end
          when 'features'
            stanza = element
            element.each { |e|
              if e.name == 'mechanisms' and e.namespace == 'urn:ietf:params:xml:ns:xmpp-sasl'
                e.each_element('mechanism') { |mech|
                  @stream_mechanisms.push(mech.text)
                }
              else
                @stream_features[e.name] = e.namespace
              end
            }
            Jabber::debuglog("FEATURES: received")
            @features_sem.run
          else
            stanza = element
        end
      else
        # Any stanza, classes are registered by XMPPElement::name_xmlns
        begin
          stanza = XMPPStanza::import(element)
        rescue NoNameXmlnsRegistered
          stanza = element
        end
      end

      if @xmlcbs.process(stanza)
        @tbcbmutex.synchronize { @processing -= 1 }
        return true
      end

      # Iterate through blocked threads (= waiting for an answer)
      #
      # We're dup'ping the @threadblocks here, so that we won't end up in an
      # endless loop if Stream#send is being nested. That means, the nested
      # threadblock won't receive the stanza currently processed, but the next
      # one.
      threadblocks = nil
      @tbcbmutex.synchronize do
        threadblocks = @threadblocks.dup
      end
      threadblocks.each { |threadblock|
        exception = nil
        r = false
        begin
          r = threadblock.call(stanza)
        rescue Exception => e
          exception = e
        end

        if r == true
          @tbcbmutex.synchronize do
            @threadblocks.delete(threadblock)
          end
          threadblock.wakeup
          @tbcbmutex.synchronize { @processing -= 1 }
          return true
        elsif exception
          @tbcbmutex.synchronize do
            @threadblocks.delete(threadblock)
          end
          threadblock.raise(exception)
        end
      }

      Jabber::debuglog("PROCESSING:\n#{stanza.to_s} (#{stanza.class})")
      Jabber::debuglog("TRYING stanzacbs...")
      if @stanzacbs.process(stanza)
          @tbcbmutex.synchronize { @processing -= 1 }
          return true
      end
      r = false
      Jabber::debuglog("TRYING message/iq/presence/cbs...")
      case stanza
      when Message
        r = @messagecbs.process(stanza)
      when Iq
        r = @iqcbs.process(stanza)
      when Presence
        r = @presencecbs.process(stanza)
      end
      @tbcbmutex.synchronize { @processing -= 1 }
      return r
    end

    ##
    # Get the list of iq callbacks.
    def iq_callbacks
      @iqcbs
    end

    ##
    # Get the list of message callbacks.
    def message_callbacks
      @messagecbs
    end

    ##
    # Get the list of presence callbacks.
    def presence_callbacks
      @presencecbs
    end

    ##
    # Get the list of stanza callbacks.
    def stanza_callbacks
      @stanzacbs
    end

    ##
    # Get the list of xml callbacks.
    def xml_callbacks
      @xmlcbs
    end

    ##
    # This is used by Jabber::Stream internally to
    # keep track of any blocks which were passed to
    # Stream#send.
    class ThreadBlock
      def initialize(block)
        @block = block
        @waiter = Semaphore.new
        @exception = nil
      end
      def call(*args)
        @block.call(*args)
      end
      def wait
        @waiter.wait
        raise @exception if @exception
      end
      def wakeup
        @waiter.run
      end
      def raise(exception)
        @exception = exception
        @waiter.run
      end
    end

    def send_data(data)
      @send_lock.synchronize do
        @last_send = Time.now
        @fd << data
        @fd.flush
      end
    end

    ##
    # Sends XML data to the socket and (optionally) waits
    # to process received data.
    #
    # Do not invoke this in a callback but in a seperate thread
    # because we may not suspend the parser-thread (in whose
    # context callbacks are executed).
    #
    # xml:: [String] The xml data to send
    # &block:: [Block] The optional block
    def send(xml, &block)
      Jabber::debuglog("SENDING:\n#{xml}")
      if block
        threadblock = ThreadBlock.new(block)
        @tbcbmutex.synchronize do
          @threadblocks.unshift(threadblock)
        end
      end
      begin
        # Temporarily remove stanza's namespace to
        # reduce bandwidth consumption
        if xml.kind_of? XMPPStanza and xml.namespace == 'jabber:client' and
            xml.prefix != 'stream' and xml.name != 'stream'
          xml.delete_namespace
          send_data(xml.to_s)
          xml.add_namespace(@streamns)
        else
          send_data(xml.to_s)
        end
      rescue Exception => e
        Jabber::warnlog("EXCEPTION:\n#{e.class}\n#{e.message}\n#{e.backtrace.join("\n")}")

        if @exception_block
          Thread.new do
            Thread.current.abort_on_exception = true
            close!
            @exception_block.call(e, self, :sending)
          end
        else
          Jabber::warnlog "Exception caught while sending! (#{e.class})\n#{e.backtrace.join("\n")}"
          close!
          raise
        end
      end
      # The parser thread might be running this (think of a callback running send())
      # If this is the case, we mustn't stop (or we would cause a deadlock)
      if block and Thread.current != @parser_thread
        threadblock.wait
      elsif block
        Jabber::warnlog("WARNING:\nCannot stop current thread in Jabber::Stream#send because it is the parser thread!")
      end
    end

    ##
    # Send an XMMP stanza with an Jabber::XMPPStanza#id. The id will be
    # generated by Jabber::IdGenerator if not already set.
    #
    # The block will be called once: when receiving a stanza with the
    # same Jabber::XMPPStanza#id. There is no need to return true to
    # complete this! Instead the return value of the block will be
    # returned. This is a direct result of unique request/response
    # stanza identification via the id attribute.
    #
    # The block may be omitted. Then, the result will be the response
    # stanza.
    #
    # Be aware that if a stanza with <tt>type='error'</tt> is received
    # the function does not yield but raises an ServerError with
    # the corresponding error element.
    #
    # Please see Stream#send for some implementational details.
    #
    # Please read the note about nesting at Stream#send
    # xml:: [XMPPStanza]
    def send_with_id(xml, &block)
      if xml.id.nil?
        xml.id = Jabber::IdGenerator.instance.generate_id
      end

      res = nil
      error = nil
      send(xml) do |received|
        if received.kind_of? XMPPStanza and received.id == xml.id
          if received.type == :error
            error = (received.error ? received.error : ErrorResponse.new)
            true
          elsif block_given?
            res = yield(received)
            true
          else
            res = received
            true
          end
        else
          false
        end
      end

      unless error.nil?
        raise ServerError.new(error)
      end

      res
    end

    ##
    # Adds a callback block to process received XML messages, these
    # will be handled before any blocks given to Stream#send or other
    # callbacks.
    #
    # priority:: [Integer] The callback's priority, the higher, the sooner
    # ref:: [String] The callback's reference
    # &block:: [Block] The optional block
    def add_xml_callback(priority = 0, ref = nil, &block)
      @tbcbmutex.synchronize do
        @xmlcbs.add(priority, ref, block)
      end
    end

    ##
    # Delete an XML-messages callback
    #
    # ref:: [String] The reference of the callback to delete
    def delete_xml_callback(ref)
      @tbcbmutex.synchronize do
        @xmlcbs.delete(ref)
      end
    end

    ##
    # Adds a callback block to process received Messages
    #
    # priority:: [Integer] The callback's priority, the higher, the sooner
    # ref:: [String] The callback's reference
    # &block:: [Block] The optional block
    def add_message_callback(priority = 0, ref = nil, &block)
      @tbcbmutex.synchronize do
        @messagecbs.add(priority, ref, block)
      end
    end

    ##
    # Delete an Message callback
    #
    # ref:: [String] The reference of the callback to delete
    def delete_message_callback(ref)
      @tbcbmutex.synchronize do
        @messagecbs.delete(ref)
      end
    end

    ##
    # Adds a callback block to process received Stanzas
    #
    # priority:: [Integer] The callback's priority, the higher, the sooner
    # ref:: [String] The callback's reference
    # &block:: [Block] The optional block
    def add_stanza_callback(priority = 0, ref = nil, &block)
      @tbcbmutex.synchronize do
        @stanzacbs.add(priority, ref, block)
      end
    end

    ##
    # Delete a Stanza callback
    #
    # ref:: [String] The reference of the callback to delete
    def delete_stanza_callback(ref)
      @tbcbmutex.synchronize do
        @stanzacbs.delete(ref)
      end
    end

    ##
    # Adds a callback block to process received Presences
    #
    # priority:: [Integer] The callback's priority, the higher, the sooner
    # ref:: [String] The callback's reference
    # &block:: [Block] The optional block
    def add_presence_callback(priority = 0, ref = nil, &block)
      @tbcbmutex.synchronize do
        @presencecbs.add(priority, ref, block)
      end
    end

    ##
    # Delete a Presence callback
    #
    # ref:: [String] The reference of the callback to delete
    def delete_presence_callback(ref)
      @tbcbmutex.synchronize do
        @presencecbs.delete(ref)
      end
    end

    ##
    # Adds a callback block to process received Iqs
    #
    # priority:: [Integer] The callback's priority, the higher, the sooner
    # ref:: [String] The callback's reference
    # &block:: [Block] The optional block
    def add_iq_callback(priority = 0, ref = nil, &block)
      @tbcbmutex.synchronize do
        @iqcbs.add(priority, ref, block)
      end
    end

    ##
    # Delete an Iq callback
    #
    # ref:: [String] The reference of the callback to delete
    #
    def delete_iq_callback(ref)
      @tbcbmutex.synchronize do
        @iqcbs.delete(ref)
      end
    end
    ##
    # Closes the connection to the Jabber service
    def close
      close!
    end

    def close!
      pr = 1
      n = 0
      # In some cases, we might lost count of some stanzas
      # (for example, if the handler raises an exception)
      # so we can't block forever.
      while pr > 0 and n <= 20
        @tbcbmutex.synchronize { pr = @processing }
        if pr > 0
          n += 1
          Jabber::debuglog("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
          #puts("TRYING TO CLOSE, STILL PROCESSING #{pr} STANZAS")
          sleep 0.1
        end
      end

      # Order Matters here! If this method is called from within 
      # @parser_thread then killing @parser_thread first would 
      # mean the other parts of the method fail to execute. 
      # That would be bad. So kill parser_thread last
      @fd.close if @fd and !@fd.closed?
      @status = DISCONNECTED
      @parser_thread.kill if @parser_thread
    end
  end
end