File: tc_tcp_pipelining.rb

package info (click to toggle)
dnsruby 1.61.5-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 1,520 kB
  • sloc: ruby: 17,811; makefile: 3
file content (255 lines) | stat: -rw-r--r-- 8,206 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# --
# Copyright 2015 Verisign
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++

require_relative 'spec_helper'
require_relative 'test_dnsserver'

# The TCPPipeliningServer links our NioTcpPipeliningHandler on
# the loopback interface.
class TCPPipeliningServer < Async::DNS::Server
  PORT     = 53937
  IP   = '127.0.0.1'

  DEFAULT_MAX_REQUESTS = 4
  DEFAULT_TIMEOUT = 3

  @@stats = Stats.new

  def self.stats
    @@stats
  end

  def initialize(**options)
    super(options)

    @handlers = []
    @handlers << NioTcpPipeliningHandler.new(self, IP, PORT, DEFAULT_MAX_REQUESTS, DEFAULT_TIMEOUT) #4 max request
  end

  def process(name, resource_class, transaction)
    @logger.debug "name: #{name}"
    transaction.respond!("93.184.216.34", { resource_class: ::Resolv::DNS::Resource::IN::A })
  end

end

class TestTCPPipelining < Minitest::Test

  class << self
    attr_accessor :query_id
  end

  def self.init
    unless @initialized
      @initialized = true
      @query_id = 0
    end
  end

  @@server = nil

  def setup
  return
    self.class.init

    # Instantiate a new server that uses our tcp pipelining handler
    # For each query the server sends the query upstream (193.0.14.129)
    options = {
        server_class: TCPPipeliningServer,
    }

    #RubyDNS::run_server(options) || true
    if !@@server
      @@server = TCPPipeliningServer.new()

      Thread.new do
        @@server.run
      end
    end

    # Instantiate our resolver. The resolver will use the same pipeline as much as possible.
    # If a timeout occurs or max_request_per_connection a new connection should be initiated
    @@resolver ||= Dnsruby::Resolver.new(
        use_tcp:                    true,
        do_caching:                 false,
        tcp_pipelining:             true,
        dnssec:                     false,
        packet_timeout:             10,
        tcp_pipelining_max_queries: 10,
        nameserver:                 TCPPipeliningServer::IP,
        port:                       TCPPipeliningServer::PORT)
  end

  # Send x number of queries asynchronously to our resolver
  def send_async_messages(number_of_messages, queue, wait_seconds = 0)
    Dnsruby.log.debug "Sending #{number_of_messages} messages"
    number_of_messages.times do
      name = "#{self.class.query_id}.com"
      Dnsruby.log.debug "Sending #{name}"
      message = Dnsruby::Message.new(name)
      # self.class.query_id identifies our query, must be different for each message
      @@resolver.send_async(message, queue, self.class.query_id)
      self.class.query_id += 1

      # Note: For 0, we don't sleep at all instead of sleeping 0 since sleeping 0
      # involves yielding the CPU.
      sleep wait_seconds unless wait_seconds == 0
    end
  end

  # Verify x responses with no exception
  def verify_responses(number_of_messages, queue)
    number_of_messages.times do
      _response_id, response, exception = queue.pop
      assert_nil(exception)
      assert(response.is_a?(Dnsruby::Message))
    end
  end

  def accept_wait(accept_count, max)
    i = 0
    while TCPPipeliningServer.stats.accept_count < accept_count
      sleep 0.5
      i+=0.5
      assert(i<max, "Max wait for accept reached #{TCPPipeliningServer.stats.accept_count} accepts < #{accept_count}")
    end
  end

  def connection_wait(connection_count, max)
    i = 0
    while TCPPipeliningServer.stats.connections > connection_count
      sleep 0.5
      i+=0.5
      assert(i<max, "Max wait for connection reached: #{TCPPipeliningServer.stats.connections} active connections > #{connection_count}")
    end
  end

  def timeout_wait(timeout_count, max)
    i = 0
    while TCPPipeliningServer.stats.timeout_count < timeout_count
      sleep 0.5
      i+=0.5
      assert(i<max, "Max wait for timeout reached #{TCPPipeliningServer.stats.timeout_count} timeounts < #{timeout_count}")
    end
  end

  # This test initiates multiple asynchronous requests and verifies they go on the same tcp
  # pipeline or a new one depending on timeouts
  def test_TCP_pipelining_timeout
  return
    Dnsruby.log.debug "test_TCP_pipelining_timeout"
    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    accept_count  = TCPPipeliningServer.stats.accept_count
    timeout_count = TCPPipeliningServer.stats.timeout_count

    # This is the main queue used to communicate between Dnsruby in async mode and the client
    query_queue = Queue.new

    # Test basic pipelining. All request should go on the same tcp connection.
    # TCPPipeliningServer.stats.accept_count should be 1.
    send_async_messages(3, query_queue)
    verify_responses(3, query_queue)

    assert_equal(accept_count + 1, TCPPipeliningServer.stats.accept_count)

    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
    timeout_wait(timeout_count + 1, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    assert_equal(timeout_count + 1, TCPPipeliningServer.stats.timeout_count)

    # Initiate another 3 queries, check accept_count and timeout_count
    send_async_messages(3, query_queue)
    verify_responses(3, query_queue)

    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
    timeout_wait(timeout_count + 2, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    assert_equal(accept_count + 2, TCPPipeliningServer.stats.accept_count)
    assert_equal(timeout_count + 2, TCPPipeliningServer.stats.timeout_count)

    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
  end

  # Test timeout occurs and new connection is initiated inbetween 2 sends
  def test_TCP_pipelining_timeout_in_send
  return
    Dnsruby.log.debug "test_TCP_pipelining_timeout_in_send"
    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    accept_count  = TCPPipeliningServer.stats.accept_count
    timeout_count = TCPPipeliningServer.stats.timeout_count

    query_queue = Queue.new

    # Initiate another 2 queries wait and then send a final query
    # Check accept_count. Wait for timeout and verify we got 2 additional timeouts.
    send_async_messages(2, query_queue)
    verify_responses(2, query_queue)

    accept_wait(accept_count+1, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    send_async_messages(1, query_queue)

    verify_responses(1, query_queue)

    assert_equal(accept_count + 2, TCPPipeliningServer.stats.accept_count)

    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    timeout_wait(timeout_count + 2, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
  end

  # Test that we get a SocketEofResolvError if the servers closes the socket before
  # all queries are answered
  def test_TCP_pipelining_socket_eof
  return
    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)

    query_queue = Queue.new

    # Issue 6 queries. Only 4 should be replied since max_request_per_connection = 4
    # Verify we get Dnsruby::SocketEofResolvError on the last 2.
    # Verify we got max_count was incremented
    send_async_messages(6, query_queue)

    responses = []

    6.times do
      response = query_queue.pop
      responses << response
    end

    responses.sort_by { |response| response[0] }

    step = 0

    responses.each do | response |
      _response_id, response, exception = response
      if step < TCPPipeliningServer::DEFAULT_MAX_REQUESTS
        assert_nil(exception, "Exception not nil for msg #{step} < #{TCPPipeliningServer::DEFAULT_MAX_REQUESTS} requests")
        assert(response.is_a?(Dnsruby::Message))
      else
        assert_equal(Dnsruby::SocketEofResolvError, exception.class)
        assert_nil(response)
      end
      step += 1
    end

    connection_wait(0, TCPPipeliningServer::DEFAULT_TIMEOUT*5)
  end
end