File: test_tools.rb

package info (click to toggle)
qpid-proton 0.37.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 18,384 kB
  • sloc: ansic: 37,828; cpp: 37,140; python: 15,302; ruby: 6,018; xml: 477; sh: 320; pascal: 52; makefile: 18
file content (172 lines) | stat: -rw-r--r-- 4,801 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# Tools for tests. Only minitest is used.

require 'minitest/autorun'
require 'qpid_proton'
require 'socket'

begin
  MiniTest::Test
rescue NameError                # For older versions of MiniTest
  MiniTest::Test = MiniTest::Unit::TestCase
end

class TestError < RuntimeError; end  # Normal error
class TestException < Exception; end # Not caught by default rescue

def wait_port(port, timeout=5)
  deadline = Time.now + timeout
  begin  # Wait for the port to be connectible
    TCPSocket.open("", $port).close
  rescue Errno::ECONNREFUSED
    if Time.now > deadline then
      raise TestError, "timed out waiting for port #{port}"
    end
    sleep(0.1)
    retry
  end
end

# Handler that records some common events that are checked by tests
class TestHandler < Qpid::Proton::MessagingHandler
  attr_reader :errors, :connections, :sessions, :links, :messages

  # Pass optional extra handlers and options to the Container
  # @param raise_errors if true raise an exception for error events, if false, store them in #errors
  def initialize(raise_errors=true)
    super()
    @raise_errors = raise_errors
    @errors, @connections, @sessions, @links, @messages = 5.times.collect { [] }
  end

  # If the handler has errors, raise a TestError with all the error text
  def raise_errors()
    return if @errors.empty?
    text = ""
    while @errors.size > 0
      text << @errors.pop + "\n"
    end
    raise TestError.new("TestHandler has errors:\n #{text}")
  end

  def on_error(condition)
    @errors.push "#{condition}"
    raise_errors if @raise_errors
  end

  def endpoint_open(queue, endpoint)
    queue.push(endpoint)
  end

  def on_connection_open(c)
    endpoint_open(@connections, c)
  end

  def on_session_open(s)
    endpoint_open(@sessions, s)
  end

  def on_receiver_open(l)
    endpoint_open(@links, l)
  end

  def on_sender_open(l)
    endpoint_open(@links, l)
  end

  def on_message(d, m)
    @messages.push(m)
  end
end

# ListenHandler that closes the Listener after first (or n) accepts
class ListenOnceHandler < Qpid::Proton::Listener::Handler
  def initialize(opts, n=1) super(opts); @n=n; end
  def on_error(l, e) raise e; end
  def on_accept(l) l.close if (@n -= 1).zero?; super; end
end

# Add port/url to Listener, assuming a TCP socket
class Qpid::Proton::Listener
  def url() "amqp://:#{port}"; end
end

# A client/server pair of ConnectionDrivers linked by a socket pair
DriverPair = Struct.new(:client, :server) do

  def initialize(client_handler, server_handler)
    s = Socket.pair(:LOCAL, :STREAM, 0)
    self.client = HandlerDriver.new(s[0], client_handler)
    self.server = HandlerDriver.new(s[1], server_handler)
    server.transport.set_server
  end

  # Process each driver once, return time of next timed event
  def process(now = Time.now, max_time=nil)
    t = collect { |d| d.process(now) }.compact.min
    t =  max_time if max_time && t > max_time
    t
  end

  def active()
    can_read = self.select { |d| d.can_read? }
    can_write = self.select  {|d| d.can_write? }
    IO.select(can_read, can_write, [], 0)
  end

  def names() collect { |x| x.handler.names }; end

  def clear() each { |x| x.handler.clear; } end

  # Run till there is nothing else to do - not handle waiting for timed events
  # but does pass +now+ to process and returns the min returned timed event time
  def run(now=Time.now)
    t = nil
    begin
      t = process(now, t)
    end while active
    t
  end
end

# Container that listens on a random port
class ServerContainer < Qpid::Proton::Container
  include Qpid::Proton

  def initialize(id=nil, listener_opts=nil, n=1, handler=nil)
    super handler, id
    @listener = listen_io(TCPServer.open(0), ListenOnceHandler.new(listener_opts, n))
  end

  attr_reader :listener

  def port() @listener.port; end
  def url() "amqp://:#{port}"; end
end

class ServerContainerThread < ServerContainer
  def initialize(*args)
    super
    @thread = Thread.new { run }
  end

  attr_reader :thread
  def join() @thread.join; end
end