File: container.rb

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (488 lines) | stat: -rw-r--r-- 16,939 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


require 'thread'
require 'set'
require_relative 'listener'
require_relative 'work_queue'

module Qpid::Proton
  public

  # An AMQP container manages a set of {Listener}s and {Connection}s which
  # contain {#Sender} and {#Receiver} links to transfer messages.  Usually, each
  # AMQP client or server process has a single container for all of its
  # connections and links.
  #
  # One or more threads can call {#run}, events generated by all the listeners and
  # connections will be dispatched in the {#run} threads.
  class Container
    include TimeCompare

    # Error raised if the container is used after {#stop} has been called.
    class StoppedError < Qpid::Proton::StoppedError
      def initialize() super("container has been stopped"); end
    end

    # Create a new Container
    # @overload initialize(id=nil)
    #   @param id [String,Symbol] A unique ID for this container, use random UUID if nil.
    #
    # @overload initialize(handler=nil, id=nil)
    #  @param id [String,Symbol] A unique ID for this container, use random UUID if nil.
    #  @param handler [MessagingHandler] Optional default handler for connections
    #   that do not have their own handler (see {#connect} and {#listen})
    #
    #   *Note*: For multi-threaded code, it is recommended to use a separate
    #   handler instance for each connection, as a shared handler may be called
    #   concurrently.
    #
    def initialize(*args)
      @handler, @id = nil
      case args.size
      when 2 then @handler, @id = args
      when 1 then
        @id = String.try_convert(args[0]) || (args[0].to_s if args[0].is_a? Symbol)
        @handler = args[0] unless @id
      when 0 then
      else raise ArgumentError, "wrong number of arguments (given #{args.size}, expected 0..2"
      end
      # Use an empty messaging adapter to give default behaviour if there's no global handler.
      @adapter = Handler::Adapter.adapt(@handler) || Handler::MessagingAdapter.new(nil)
      @id = (@id || SecureRandom.uuid).freeze

      # Threading and implementation notes: see comment on #run_one
      @work = Queue.new
      @work << :start
      @work << :select
      @wake = SelectWaker.new   # Wakes #run thread in IO.select
      @auto_stop = true         # Stop when @active drops to 0
      @work_queue = WorkQueue.new(self)  # work scheduled by other threads for :select context

      # Following instance variables protected by lock
      @lock = Mutex.new
      @active = 0               # All active tasks, in @selectable, @work or being processed
      @selectable = Set.new     # Tasks ready to block in IO.select
      @running = 0              # Count of #run threads
      @stopped = false          # #stop called
      @stop_err = nil           # Optional error to pass to tasks, from #stop
      @panic = nil              # Exception caught in a run thread, to be raised by all run threads
    end

    # @return [MessagingHandler] The container-wide handler
    attr_reader :handler

    # @return [String] unique identifier for this container
    attr_reader :id

    def to_s() "#<#{self.class} id=#{id.inspect}>"; end
    def inspect() to_s; end

    # Auto-stop flag.
    #
    # True (the default) means that the container will stop automatically, as if {#stop}
    # had been called, when the last listener or connection closes.
    #
    # False means {#run} will not return unless {#stop} is called.
    #
    # @return [Bool] auto-stop state
    attr_accessor :auto_stop

    # True if the container has been stopped and can no longer be used.
    # @return [Bool] stopped state
    attr_accessor :stopped

    # Number of threads in {#run}
    # @return [Bool] {#run} thread count
    def running() @lock.synchronize { @running }; end

    # Open an AMQP connection.
    #
    # @param url [String, URI] Open a {TCPSocket} to url.host, url.port.
    # url.scheme must be "amqp" or "amqps", url.scheme.nil? is treated as "amqp"
    # url.user, url.password are used as defaults if opts[:user], opts[:password] are nil
    # @option (see Connection#open)
    # @return [Connection] The new AMQP connection
    def connect(url, opts=nil)
      not_stopped
      url = Qpid::Proton::uri url
      opts ||= {}
      if url.user ||  url.password
        opts[:user] ||= url.user
        opts[:password] ||= url.password
      end
      opts[:ssl_domain] ||= SSLDomain.new(SSLDomain::MODE_CLIENT) if url.scheme == "amqps"
      connect_io(TCPSocket.new(url.host, url.port), opts)
    end

    # Open an AMQP protocol connection on an existing {IO} object
    # @param io [IO] An existing {IO} object, e.g. a {TCPSocket}
    # @option (see Connection#open)
    def connect_io(io, opts=nil)
      not_stopped
      cd = connection_driver(io, opts)
      cd.connection.open()
      add(cd)
      cd.connection
    end

    # Listen for incoming AMQP connections
    #
    # @param url [String,URI] Listen on host:port of the AMQP URL
    # @param handler [Listener::Handler] A {Listener::Handler} object that will be called
    # with events for this listener and can generate a new set of options for each one.
    # @return [Listener] The AMQP listener.
    #
    def listen(url, handler=Listener::Handler.new)
      not_stopped
      url = Qpid::Proton::uri url
      # TODO aconway 2017-11-01: amqps, SSL
      listen_io(TCPServer.new(url.host, url.port), handler)
    end

    # Listen for incoming AMQP connections on an existing server socket.
    # @param io A server socket, for example a {TCPServer}
    # @param handler [Listener::Handler] Handler for events from this listener
    #
    def listen_io(io, handler=Listener::Handler.new)
      not_stopped
      l = ListenTask.new(io, handler, self)
      add(l)
      l.listener
    end

    # Run the container: wait for IO activity, dispatch events to handlers.
    #
    # *Multi-threaading* : More than one thread can call {#run} concurrently,
    # the container will use all {#run} threads as a thread pool. Calls to
    # {MessagingHandler} or {Listener::Handler} methods are serialized for each
    # connection or listener. See {WorkQueue} for coordinating with other
    # threads.
    #
    # *Exceptions*: If any handler method raises an exception it will stop the
    # container, and the exception will be raised by all calls to {#run}. For
    # single threaded code this is often desirable. Multi-threaded server
    # applications should normally rescue exceptions in the handler and deal
    # with them in another way: logging, closing the connection with an error
    # condition, signalling another thread etc.
    #
    # @return [void] Returns when the container stops, see {#stop} and {#auto_stop}
    #
    # @raise [StoppedError] If the container has already been stopped when {#run} was called.
    #
    # @raise [Exception] If any {MessagingHandler} or {Listener::Handler} managed by
    #   the container raises an exception, that exception will be raised by {#run}
    #
    def run
      @lock.synchronize do
        @running += 1        # Note: ensure clause below will decrement @running
        raise StoppedError if @stopped
      end
      while task = @work.pop
        run_one(task, Time.now)
      end
      @lock.synchronize { raise @panic if @panic }
    ensure
      @lock.synchronize do
        if (@running -= 1) > 0
          work_wake nil         # Signal the next thread
        else
          # This is the last thread, no need to do maybe_panic around this final handler call.
          @adapter.on_container_stop(self) if @adapter.respond_to? :on_container_stop
        end
      end
    end

    # Stop the container.
    #
    # Close all listeners and abort all connections without doing AMQP protocol close.
    #
    # {#stop} returns immediately, calls to {#run} will return when all activity
    # is finished.
    #
    # The container can no longer be used, using a stopped container raises
    # {StoppedError}.  Create a new container if you want to resume activity.
    #
    # @param error [Condition] Optional error condition passed to
    #  {MessagingHandler#on_transport_error} for each connection and
    #  {Listener::Handler::on_error} for each listener.
    #
    # @param panic [Exception] Optional exception to raise from all calls to run()
    #
    def stop(error=nil, panic=nil)
      @lock.synchronize do
        return if @stopped
        @stop_err = Condition.convert(error)
        @panic = panic
        @stopped = true
        check_stop_lh
        # NOTE: @stopped =>
        # - no new run threads can join
        # - no more select calls after next wakeup
        # - once @active == 0, all threads will be stopped with nil
      end
      wake
    end

    # Get the {WorkQueue} that can be used to schedule code to be run by the container.
    #
    # Note: to run code that affects a {Connection} or it's associated objects,
    # use {Connection#work_queue}
    def work_queue() @work_queue; end

    # (see WorkQueue#schedule)
    def schedule(at, &block) @work_queue.schedule(at, &block) end

    private

    def wake() @wake.wake; end

    class ConnectionTask < Qpid::Proton::HandlerDriver
      include TimeCompare

      def initialize container, io, opts, server=false
        super io, opts[:handler]
        transport.set_server if server
        transport.apply opts
        connection.apply opts
        @work_queue = WorkQueue.new(container)
        connection.instance_variable_set(:@work_queue, @work_queue)
      end
      def next_tick() earliest(super, @work_queue.next_tick); end
      def process(now) @work_queue.process(now); super(); end

      def dispatch              # Intercept dispatch to close work_queue
        super
        @work_queue.close if read_closed? && write_closed?
      end
    end

    class ListenTask < Listener

      def initialize(io, handler, container)
        @io, @handler = io, handler
        @listener = Listener.new(io, container)
        env = ENV['PN_TRACE_EVT']
        if env && ["true", "1", "yes", "on"].include?(env.downcase)
          @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
        else
          @log_prefix = nil
        end
        dispatch(:on_open);
      end

      attr_reader :listener
      def closing?() @listener.instance_variable_get(:@closing); end
      def condition() @listener.instance_variable_get(:@condition); end
      def closed?() @io.closed?; end

      def process
        return if closed?
        unless closing?
          begin
            return @io.accept, dispatch(:on_accept)
          rescue IO::WaitReadable, Errno::EINTR
          rescue StandardError => e
            @listener.close(e)
          end
        end
      ensure
        if closing?
          @io.close rescue nil
          @listener.instance_variable_set(:@closed, true)
          dispatch(:on_error, condition) if condition
          dispatch(:on_close)
        end
      end

      def can_read?() !finished?; end
      def can_write?() false; end
      def finished?() closed?; end

      def dispatch(method, *args)
        # TODO aconway 2017-11-27: better logging
        STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
        @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
      end

      def next_tick() nil; end

      # Close listener and force immediate close of socket
      def close(e=nil)
        @listener.close(e)
        @io.close rescue nil
      end
    end

    # Selectable object that can be used to wake IO.select from another thread
    class SelectWaker
      def initialize
        @rd, @wr = IO.pipe
        @lock = Mutex.new
        @set = false
      end

      def to_io() @rd; end

      def wake
        @lock.synchronize do
          return if @set        # Don't write if already has data
          @set = true
          @wr.write_nonblock('x') rescue nil
        end
      end

      def reset
        @lock.synchronize do
          return unless @set
          @rd.read_nonblock(1) rescue nil
          @set = false
        end
      end

      def close
        @rd.close
        @wr.close
      end
    end

    # Handle a single item from the @work queue, this is the heart of the #run loop.
    # Take one task from @work, process it, and rearm for select
    # Tasks are: ConnectionTask, ListenTask, :start, :select
    # - ConnectionTask/ListenTask have #can_read, #can_write, #next_tick to set up IO.select
    #   and #process to run handlers and process relevant work_queue
    # - nil means exit from the  #run thread exit (handled by #run)
    # - :select does IO.select and processes Container#work_queue
    def run_one(task, now)
      case task

      when :start
        maybe_panic { @adapter.on_container_start(self) } if @adapter.respond_to? :on_container_start

      when :select
        # Compute read/write select sets and minimum next_tick for select timeout
        r, w = [@wake], []
        next_tick = @work_queue.next_tick
        @lock.synchronize do
          @selectable.each do |s|
            r << s if s.can_read?
            w << s if s.can_write?
            next_tick = earliest(s.next_tick, next_tick)
          end
        end

        timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
        r, w = IO.select(r, w, nil, timeout)
        @wake.reset if r && r.delete(@wake)
        now = Time.now unless timeout == 0 # Update now if we may have blocked

        # selected is a Set to eliminate duplicates between r, w and next_tick due.
        selected = Set.new
        selected.merge(r) if r
        selected.merge(w) if w
        stopped = @lock.synchronize do
          if @stopped           # close everything
            @selectable.each { |s| s.close @stop_err; @work << s }
            @selectable.clear
            @work_queue.close
            @wake.close
          else
            @selectable -= selected # Remove already-selected tasks from @selectable
            # Also select and remove items with next_tick before now
            @selectable.delete_if { |s| before_eq(s.next_tick, now) and selected << s }
          end
          @stopped
        end
        selected.each { |s| @work << s } # Queue up tasks needing #process
        maybe_panic { @work_queue.process(now) } # Process current work queue items
        @work_queue.clear if stopped
        @lock.synchronize { check_stop_lh } if @work_queue.empty?

        @work << :select  unless stopped # Enable next select

      when ConnectionTask then
        maybe_panic { task.process now }
        rearm task

      when ListenTask then
        io, opts = maybe_panic { task.process }
        add(connection_driver(io, opts, true)) if io
        rearm task
      end
    end

    # Rescue any exception raised by the block and stop the container.
    def maybe_panic
      begin
        yield
      rescue Exception => e
        stop(nil, e)
        nil
      end
    end

    # Normally if we add work we need to set a wakeup to ensure a single #run
    # thread doesn't get stuck in select while there is other work on the queue.
    def work_wake(task)
      @work << task
      @wake.wake
    end

    def connection_driver(io, opts=nil, server=false)
      opts ||= {}
      opts[:container] = self
      opts[:handler] ||= @adapter
      ConnectionTask.new(self, io, opts, server)
    end

    # All new tasks are added here
    def add task
      @lock.synchronize do
        @active += 1
        task.close @stop_err if @stopped
      end
      work_wake task
    end

    def rearm task
      @lock.synchronize do
        if task.finished?
          @active -= 1
          check_stop_lh
        elsif @stopped
          task.close @stop_err
          work_wake task
        else
          @selectable << task
        end
      end
      @wake.wake
    end

    def check_stop_lh
      if @active.zero? && (@auto_stop || @stopped) && @work_queue.empty?
        @stopped = true
        work_wake nil          # Signal threads to stop
        true
      end
    end

    def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end

  end
end