1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
#--
#
# Author:: Tsutomu Katsube.
# Copyright:: Copyright (c) 2025 Tsutomu Katsube. All rights reserved.
# License:: Ruby license.
require "socket"
require_relative "process-test-result"
require_relative "sub-test-result"
require_relative "test-process-run-context"
require_relative "test-suite-runner"
module Test
module Unit
class TestSuiteProcessRunner < TestSuiteRunner
MAIN_TO_WORKER_INPUT_FILENO = 3
WORKER_TO_MAIN_OUTPUT_FILENO = 4
class << self
class Worker
attr_reader :main_to_worker_output, :worker_to_main_input
def initialize(pid, main_to_worker_output, worker_to_main_input)
@pid = pid
@main_to_worker_output = main_to_worker_output
@worker_to_main_input = worker_to_main_input
end
def receive
Marshal.load(@worker_to_main_input)
end
def send(data)
Marshal.dump(data, @main_to_worker_output)
@main_to_worker_output.flush
end
def wait
begin
Process.waitpid(@pid)
ensure
begin
@main_to_worker_output.close
ensure
@worker_to_main_input.close
end
end
end
end
def run_all_tests(result, options)
n_workers = TestSuiteRunner.n_workers
test_suite = options[:test_suite]
start_tcp_server do |tcp_server|
workers = []
begin
n_workers.times do |i|
load_paths = options[:load_paths]
base_directory = options[:base_directory]
test_paths = options[:test_paths]
command_line = [Gem.ruby, File.join(__dir__, "process-worker.rb")]
load_paths.each do |load_path|
command_line << "--load-path" << load_path
end
unless base_directory.nil?
command_line << "--base-directory" << base_directory
end
command_line << "--worker-id" << (i + 1).to_s
if Gem.win_platform?
local_address = tcp_server.local_address
command_line << "--ip-address" << local_address.ip_address
command_line << "--ip-port" << local_address.ip_port.to_s
end
command_line.concat(test_paths)
if Gem.win_platform?
# On Windows, file descriptors 3 and above cannot be passed to
# child processes.
pid = spawn(*command_line)
data_socket = tcp_server.accept
workers << Worker.new(pid, data_socket, data_socket)
else
main_to_worker_input, main_to_worker_output = IO.pipe
worker_to_main_input, worker_to_main_output = IO.pipe
pid = spawn(*command_line, {MAIN_TO_WORKER_INPUT_FILENO => main_to_worker_input,
WORKER_TO_MAIN_OUTPUT_FILENO => worker_to_main_output})
main_to_worker_input.close
worker_to_main_output.close
workers << Worker.new(pid, main_to_worker_output, worker_to_main_input)
end
end
run_context = TestProcessRunContext.new(self)
yield(run_context)
run_context.progress_block.call(TestSuite::STARTED, test_suite.name)
run_context.progress_block.call(TestSuite::STARTED_OBJECT, test_suite)
worker_inputs = workers.collect(&:worker_to_main_input)
until run_context.test_names.empty? do
select_each_worker(worker_inputs, workers) do |_, worker, data|
case data[:status]
when :ready
test_name = run_context.test_names.shift
break if test_name.nil?
worker.send(test_name)
when :result
add_result(result, data)
when :event
emit_event(options[:event_listener], data)
end
end
end
workers.each do |worker|
worker.send(nil)
end
until worker_inputs.empty? do
select_each_worker(worker_inputs, workers) do |worker_to_main_input, worker, data|
case data[:status]
when :result
add_result(result, data)
when :event
emit_event(options[:event_listener], data)
when :done
worker_inputs.delete(worker_to_main_input)
worker.send(nil)
end
end
end
ensure
workers.each do |worker|
worker.wait
end
end
run_context.progress_block.call(TestSuite::FINISHED, test_suite.name)
run_context.progress_block.call(TestSuite::FINISHED_OBJECT, test_suite)
end
end
private
def start_tcp_server
if Gem.win_platform?
TCPServer.open("127.0.0.1", 0) do |tcp_server|
yield(tcp_server)
end
else
yield
end
end
def select_each_worker(worker_inputs, workers)
readables, = IO.select(worker_inputs)
readables.each do |worker_to_main_input|
worker = workers.find do |w|
w.worker_to_main_input == worker_to_main_input
end
data = worker.receive
yield(worker_to_main_input, worker, data)
end
end
def add_result(result, data)
action = data[:action]
args = data[:args]
result.__send__(action, *args)
end
def emit_event(event_listener, data)
event_name = data[:event_name]
args = data[:args]
event_listener.call(event_name, *args)
end
end
def run(worker_context, &progress_block)
worker_context.run_context.progress_block = progress_block
run_tests_recursive(@test_suite, worker_context, &progress_block)
end
private
def run_tests_recursive(test_suite, worker_context, &progress_block)
run_context = worker_context.run_context
if test_suite.have_fixture?
run_context.test_names << test_suite.name
else
test_suite.tests.each do |test|
if test.is_a?(TestSuite)
run_tests_recursive(test, worker_context, &progress_block)
else
run_context.test_names << test.name
end
end
end
end
end
end
end
|