File: integration.rb

package info (click to toggle)
puma 6.6.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,884 kB
  • sloc: ruby: 17,542; ansic: 2,003; java: 1,006; sh: 379; makefile: 10
file content (576 lines) | stat: -rw-r--r-- 17,797 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# frozen_string_literal: true

require "puma/control_cli"
require "json"
require "open3"
require_relative 'tmp_path'

# Only single mode tests go here. Cluster and pumactl tests
# have their own files, use those instead
class TestIntegration < TimeoutTestCase
  include TmpPath
  HOST  = "127.0.0.1"
  TOKEN = "xxyyzz"
  RESP_READ_LEN = 65_536
  RESP_READ_TIMEOUT = 10
  RESP_SPLIT = "\r\n\r\n"

  # used in wait_for_server_to_* methods
  LOG_TIMEOUT   = Puma::IS_JRUBY ? 20 : 10
  LOG_WAIT_READ = Puma::IS_JRUBY ? 5 : 2
  LOG_ERROR_SLEEP = 0.2
  LOG_ERROR_QTY   = 5

  # rubyopt requires bundler/setup, so we don't need it here
  BASE = "#{Gem.ruby} -Ilib"

  def setup
    @server = nil
    @config_file = nil
    @server_log = +''
    @pid = nil
    @ios_to_close = []
    @bind_path    = tmp_path('.sock')
    @control_path     = nil
    @control_tcp_port = nil
  end

  def teardown
    if @server && @control_tcp_port && Puma.windows?
      cli_pumactl 'stop'
    elsif @server && @pid && !Puma.windows?
      stop_server @pid, signal: :INT
    end

    @ios_to_close&.each do |io|
      begin
        io.close if io.respond_to?(:close) && !io.closed?
      rescue
      ensure
        io = nil
      end
    end

    if @bind_path
      refute File.exist?(@bind_path), "Bind path must be removed after stop"
      File.unlink(@bind_path) rescue nil
    end

    # wait until the end for OS buffering?
    if @server
      begin
        @server.close unless @server.closed?
      rescue
      ensure
        @server = nil
      end
    end

    if @config_file
      File.unlink(@config_file.path) rescue nil
      @config_file = nil
    end
  end

  private

  def silent_and_checked_system_command(*args)
    assert(system(*args, out: File::NULL, err: File::NULL))
  end

  def with_unbundled_env
    bundler_ver = Gem::Version.new(Bundler::VERSION)
    if bundler_ver < Gem::Version.new('2.1.0')
      Bundler.with_clean_env { yield }
    else
      Bundler.with_unbundled_env { yield }
    end
  end

  def cli_server(argv,  # rubocop:disable Metrics/ParameterLists
      unix: false,      # uses a UNIXSocket for the server listener when true
      config: nil,      # string to use for config file
      no_bind: nil,     # bind is defined by args passed or config file
      merge_err: false, # merge STDERR into STDOUT
      log: false,       # output server log to console (for debugging)
      no_wait: false,   # don't wait for server to boot
      puma_debug: nil,  # set env['PUMA_DEBUG'] = 'true'
      env: {})          # pass env setting to Puma process in IO.popen

    if config
      @config_file = Tempfile.create(%w(config .rb))
      @config_file.syswrite config
      # not supported on some OS's, all GitHub Actions OS's support it
      @config_file.fsync rescue nil
      @config_file.close
      config = "-C #{@config_file.path}"
    end

    puma_path = File.expand_path '../../../bin/puma', __FILE__

    cmd =
      if no_bind
        "#{BASE} #{puma_path} #{config} #{argv}"
      elsif unix
        "#{BASE} #{puma_path} #{config} -b unix://#{@bind_path} #{argv}"
      else
        @tcp_port = UniquePort.call
        @bind_port = @tcp_port
        "#{BASE} #{puma_path} #{config} -b tcp://#{HOST}:#{@tcp_port} #{argv}"
      end

    env['PUMA_DEBUG'] = 'true' if puma_debug

    STDOUT.syswrite "\n#{full_name}\n  #{cmd}\n" if log

    if merge_err
      @server = IO.popen(env, cmd, :err=>[:child, :out])
    else
      @server = IO.popen(env, cmd)
    end
    @pid = @server.pid
    wait_for_server_to_boot(log: log) unless no_wait
    @server
  end

  # rescue statements are just in case method is called with a server
  # that is already stopped/killed, especially since Process.wait2 is
  # blocking
  def stop_server(pid = @pid, signal: :TERM)
    begin
      Process.kill signal, pid
    rescue Errno::ESRCH
    end
    begin
      Process.wait2 pid
    rescue Errno::ECHILD
    end
  end

  # Most integration tests do not stop/shutdown the server, which is handled by
  # `teardown` in this file.
  # For tests that do stop/shutdown the server, use this method to check with `wait2`,
  # and also clear variables so `teardown` will not run its code.
  def wait_server(exit_code = 0, pid: @pid)
    return unless pid
    begin
      _, status = Process.wait2 pid
      status = status.exitstatus % 128 if ::Puma::IS_JRUBY
      assert_equal exit_code, status
    rescue Errno::ECHILD # raised on Windows ?
    end
  ensure
    @server.close unless @server.closed?
    @server = nil
  end

  def restart_server_and_listen(argv, env: {}, log: false)
    cli_server argv, env: env, log: log
    connection = connect
    initial_reply = read_body(connection)
    restart_server connection, log: log
    [initial_reply, read_body(connect)]
  end

  # reuses an existing connection to make sure that works
  def restart_server(connection, log: false)
    Process.kill :USR2, @pid
    wait_for_server_to_include 'Restarting', log: log
    connection.write "GET / HTTP/1.1\r\n\r\n" # trigger it to start by sending a new request
    wait_for_server_to_boot log: log
  end

  # wait for server to say it booted
  # @server and/or @server.gets may be nil on slow CI systems
  def wait_for_server_to_boot(timeout: LOG_TIMEOUT, log: false)
    @puma_pid = wait_for_server_to_match(/(?:Master|      ) PID: (\d+)$/, 1, timeout: timeout, log: log)&.to_i
    @pid = @puma_pid if @pid != @puma_pid
    wait_for_server_to_include 'Ctrl-C', timeout: timeout, log: log
  end

  # Returns true if and when server log includes str.  Will timeout otherwise.
  def wait_for_server_to_include(str, timeout: LOG_TIMEOUT, log: false)
    time_timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
    line = ''

    puts "\n——— #{full_name} waiting for '#{str}'" if log
    line = server_gets(str, time_timeout, log: log) until line&.include?(str)
    true
  end

  # Returns line if and when server log matches re, unless idx is specified,
  # then returns regex match.  Will timeout otherwise.
  def wait_for_server_to_match(re, idx = nil, timeout: LOG_TIMEOUT, log: false)
    time_timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
    line = ''

    puts "\n——— #{full_name} waiting for '#{re.inspect}'" if log
    line = server_gets(re, time_timeout, log: log) until line&.match?(re)
    idx ? line[re, idx] : line
  end

  def server_gets(match_obj, time_timeout, log: false)
    error_retries = 0
    line = ''

    sleep 0.05 until @server.is_a?(IO) || Process.clock_gettime(Process::CLOCK_MONOTONIC) > time_timeout

    raise Minitest::Assertion,  "@server is not an IO" unless @server.is_a?(IO)
    if Process.clock_gettime(Process::CLOCK_MONOTONIC) > time_timeout
      raise Minitest::Assertion, "Timeout waiting for server to log #{match_obj.inspect}"
    end

    begin
      if @server.wait_readable(LOG_WAIT_READ) and line = @server&.gets
        @server_log << line
        puts "    #{line}" if log
      end
    rescue StandardError => e
      error_retries += 1
      raise(e, "Waiting for server to log #{match_obj.inspect}") if error_retries == LOG_ERROR_QTY
      sleep LOG_ERROR_SLEEP
      retry
    end
    if Process.clock_gettime(Process::CLOCK_MONOTONIC) > time_timeout
      raise Minitest::Assertion, "Timeout waiting for server to log #{match_obj.inspect}"
    end
    line
  end

  def connect(path = nil, unix: false)
    s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
    @ios_to_close << s
    s << "GET /#{path} HTTP/1.1\r\n\r\n"
    s
  end

  # use only if all socket writes are fast
  # does not wait for a read
  def fast_connect(path = nil, unix: false)
    s = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
    @ios_to_close << s
    fast_write s, "GET /#{path} HTTP/1.1\r\n\r\n"
    s
  end

  def fast_write(io, str)
    n = 0
    while true
      begin
        n = io.syswrite str
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK => e
        unless io.wait_writable 5
          raise e
        end

        retry
      rescue Errno::EPIPE, SystemCallError, IOError => e
        raise e
      end

      return if n == str.bytesize
      str = str.byteslice(n..-1)
    end
  end

  def read_body(connection, timeout = nil)
    read_response(connection, timeout).last
  end

  def read_response(connection, timeout = nil)
    timeout ||= RESP_READ_TIMEOUT
    content_length = nil
    chunked = nil
    response = +''
    t_st = Process.clock_gettime Process::CLOCK_MONOTONIC
    if connection.to_io.wait_readable timeout
      loop do
        begin
          part = connection.read_nonblock(RESP_READ_LEN, exception: false)
          case part
          when String
            unless content_length || chunked
              chunked ||= part.include? "\r\nTransfer-Encoding: chunked\r\n"
              content_length = (t = part[/^Content-Length: (\d+)/i , 1]) ? t.to_i : nil
            end

            response << part
            hdrs, body = response.split RESP_SPLIT, 2
            unless body.nil?
              # below could be simplified, but allows for debugging...
              ret =
                if content_length
                  body.bytesize == content_length
                elsif chunked
                  body.end_with? "\r\n0\r\n\r\n"
                elsif !hdrs.empty? && !body.empty?
                  true
                else
                  false
                end
              if ret
                return [hdrs, body]
              end
            end
            sleep 0.000_1
          when :wait_readable, :wait_writable # :wait_writable for ssl
            sleep 0.000_2
          when nil
            raise EOFError
          end
          if timeout < Process.clock_gettime(Process::CLOCK_MONOTONIC) - t_st
            raise Timeout::Error, 'Client Read Timeout'
          end
        end
      end
    else
      raise Timeout::Error, 'Client Read Timeout'
    end
  end

  # gets worker pids from @server output
  def get_worker_pids(phase = 0, size = workers, log: false)
    pids = []
    re = /PID: (\d+)\) booted in [.0-9]+s, phase: #{phase}/
    while pids.size < size
      if pid = wait_for_server_to_match(re, 1, log: log)
        pids << pid
      end
    end
    pids.map(&:to_i)
  end

  # used to define correct 'refused' errors
  def thread_run_refused(unix: false)
    if unix
      DARWIN ? [IOError, Errno::ENOENT, Errno::EPIPE, Errno::EBADF] :
               [IOError, Errno::ENOENT]
    else
      # Errno::ECONNABORTED is thrown intermittently on TCPSocket.new
      # Errno::ECONNABORTED is thrown by Windows on read or write
      DARWIN ? [IOError, Errno::ECONNREFUSED, Errno::EPIPE, Errno::EBADF, EOFError, Errno::ECONNABORTED] :
               [IOError, Errno::ECONNREFUSED, Errno::EPIPE, Errno::ECONNABORTED]
    end
  end

  def set_pumactl_args(unix: false)
    if unix
      @control_path = tmp_path('.cntl_sock')
      "--control-url unix://#{@control_path} --control-token #{TOKEN}"
    else
      @control_tcp_port = UniquePort.call
      "--control-url tcp://#{HOST}:#{@control_tcp_port} --control-token #{TOKEN}"
    end
  end

  def cli_pumactl(argv, unix: false, no_bind: nil)
    arg =
      if no_bind
        argv.split(/ +/)
      elsif @control_path && !@control_tcp_port
        %W[-C unix://#{@control_path} -T #{TOKEN} #{argv}]
      elsif @control_tcp_port && !@control_path
        %W[-C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}]
      end

    r, w = IO.pipe
    @ios_to_close << r
    Puma::ControlCLI.new(arg, w, w).run
    w.close
    r
  end

  def cli_pumactl_spawn(argv, unix: false, no_bind: nil)
    arg =
      if no_bind
        argv
      elsif @control_path && !@control_tcp_port
        %Q[-C unix://#{@control_path} -T #{TOKEN} #{argv}]
      elsif @control_tcp_port && !@control_path
        %Q[-C tcp://#{HOST}:#{@control_tcp_port} -T #{TOKEN} #{argv}]
      end

    pumactl_path = File.expand_path '../../../bin/pumactl', __FILE__

    cmd = "#{BASE} #{pumactl_path} #{arg}"

    io = IO.popen(cmd, :err=>[:child, :out])
    @ios_to_close << io
    io
  end

  def get_stats
    read_pipe = cli_pumactl "stats"
    read_pipe.wait_readable 2
    # `split("\n", 2).last` removes "Command stats sent success" line
    JSON.parse read_pipe.read.split("\n", 2).last
  end

  def restart_does_not_drop_connections(
      num_threads: 1,
      total_requests: 500,
      config: nil,
      unix: nil,
      signal: nil,
      log: nil
    )
    skipped = true
    skip_if :jruby, suffix: <<-MSG
 - file descriptors are not preserved on exec on JRuby; connection reset errors are expected during restarts
    MSG
    skip_if :truffleruby, suffix: ' - Undiagnosed failures on TruffleRuby'

    clustered = (workers || 0) >= 2

    args = "-w #{workers} -t 5:5 -q test/rackup/hello_with_delay.ru"
    if Puma.windows?
      cli_server "#{set_pumactl_args} #{args}", unix: unix, config: config, log: log
    else
      cli_server args, unix: unix, config: config, log: log
    end

    skipped = false
    replies = Hash.new 0
    refused = thread_run_refused unix: false
    message = 'A' * 16_256  # 2^14 - 128

    mutex = Mutex.new
    restart_count = 0
    client_threads = []

    num_requests = (total_requests/num_threads).to_i

    num_threads.times do |thread|
      client_threads << Thread.new do
        num_requests.times do |req_num|
          begin
            begin
              socket = unix ? UNIXSocket.new(@bind_path) : TCPSocket.new(HOST, @tcp_port)
              fast_write socket, "POST / HTTP/1.1\r\nContent-Length: #{message.bytesize}\r\n\r\n#{message}"
            rescue => e
              replies[:write_error] += 1
              raise e
            end
            body = read_body(socket, 10)
            if body == "Hello World"
              mutex.synchronize {
                replies[:success] += 1
                replies[:restart] += 1 if restart_count > 0
              }
            else
              mutex.synchronize { replies[:unexpected_response] += 1 }
            end
          rescue Errno::ECONNRESET, Errno::EBADF, Errno::ENOTCONN
            # connection was accepted but then closed
            # client would see an empty response
            # Errno::EBADF Windows may not be able to make a connection
            mutex.synchronize { replies[:reset] += 1 }
          rescue *refused, IOError
            # IOError intermittently thrown by Ubuntu, add to allow retry
            mutex.synchronize { replies[:refused] += 1 }
          rescue ::Timeout::Error
            mutex.synchronize { replies[:read_timeout] += 1 }
          ensure
            if socket.is_a?(IO) && !socket.closed?
              begin
                socket.close
              rescue Errno::EBADF
              end
            end
          end
        end
        # STDOUT.puts "#{thread} #{replies[:success]}"
      end
    end

    run = true

    restart_thread = Thread.new do
      # Wait for some connections before first restart
      sleep 0.2
      while run
        if Puma.windows?
          cli_pumactl 'restart'
        else
          Process.kill signal, @pid
        end
        if signal == :USR2
          # If 'wait_for_server_to_boot' times out, error in thread shuts down CI
          begin
            wait_for_server_to_boot timeout: 5
          rescue Minitest::Assertion # Timeout
            run = false
          end
        end
        restart_count += 1

        if Puma.windows?
          sleep 2.0
        elsif clustered
          phase = signal == :USR2 ? 0 : restart_count
          # If 'get_worker_pids phase' times out, error in thread shuts down CI
          begin
            get_worker_pids phase, log: log
            # Wait with an exponential backoff before signaling next restart
            sleep 0.15 * (2 ** restart_count)
          rescue Minitest::Assertion # Timeout
            run = false
          end
        else
          sleep 0.1
        end
      end
    end

    # cycle thru threads rather than one at a time
    until client_threads.empty?
      client_threads.each_with_index do |t, i|
        client_threads[i] = nil if t.join(1)
      end
      client_threads.compact!
    end

    run = false
    restart_thread.join
    if Puma.windows?
      cli_pumactl 'stop'
      wait_server
    else
      stop_server
    end
    @server = nil

    msg = ("   %4d unexpected_response\n"   % replies.fetch(:unexpected_response,0)).dup
    msg << "   %4d refused\n"               % replies.fetch(:refused,0)
    msg << "   %4d read timeout\n"          % replies.fetch(:read_timeout,0)
    msg << "   %4d reset\n"                 % replies.fetch(:reset,0)
    msg << "   %4d write_errors\n"          % replies.fetch(:write_error,0)
    msg << "   %4d success\n"               % replies.fetch(:success,0)
    msg << "   %4d success after restart\n" % replies.fetch(:restart,0)
    msg << "   %4d restart count\n"         % restart_count

    actual_requests = num_threads * num_requests
    allowed_errors = (actual_requests * 0.002).round

    refused = replies[:refused]
    reset   = replies[:reset]

    assert_operator restart_count, :>=, 2, msg

    if Puma.windows?
      assert_equal actual_requests - reset - refused, replies[:success]
    else
      assert_operator replies[:success], :>=,  actual_requests - allowed_errors, msg
    end

  ensure
    return if skipped
    if passed?
      msg = "    #{restart_count} restarts, #{reset} resets, #{refused} refused, #{replies[:restart]} success after restart, #{replies[:write_error]} write error"
      $debugging_info << "#{full_name}\n#{msg}\n"
    else
      client_threads.each { |thr| thr.kill if thr.is_a? Thread }
      $debugging_info << "#{full_name}\n#{msg}\n"
    end
  end
end