1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
|
require 'rspec'
require 'in_parallel'
include InParallel
TMP_FILE = Dir.mktmpdir + 'test_file.txt'
class SingletonTest
def initialize
@test_data = [1, 2, 3]
end
def get_test_data
@test_data
end
end
class SingletonWrapper
def initialize
@instance_var = get_singleton_class
singleton_class.class_eval do
@@x = "foo"
@x = 'bar'
end
end
def get_instance_var
@instance_var
end
end
def get_wrapper
SingletonWrapper.new
end
def get_singleton_class
test = SingletonTest.new
def test.someval
"someval"
end
return test
end
# Helper functions for the unit tests
def method_with_param(param)
puts "foo"
puts "bar + #{param} \n"
return "bar + #{param}"
end
def method_without_param
ret_val = { :foo => "bar" }
puts ret_val
return ret_val
end
def simple_puts(my_string)
puts my_string
end
def create_file_with_delay(file_path, wait=2)
sleep wait
File.open(file_path, 'w') { |f| f.write('contents') }
return true
end
def get_pid
return Process.pid
end
def raise_an_error
raise StandardError.new('An error occurred')
end
#Tests
describe '.run_in_parallel' do
before do
File.delete(TMP_FILE) if File.exist?(TMP_FILE)
end
it 'should run methods in another process' do
run_in_parallel do
@result = get_pid
@result2 = get_pid
end
expect(@result).to_not eq(Process.pid)
expect(@result2).to_not eq(Process.pid)
expect(@result).to_not eq(@result2)
end
it 'should return correct values' do
start_time = Time.now
run_in_parallel do
@result_from_test = method_with_param('blah')
@result_2 = method_without_param
end
# return values for instance variables should be set correctly
expect(@result_from_test).to eq 'bar + blah'
# should be able to return objects (not just strings)
expect(@result_2).to eq({ :foo => "bar" })
end
it "should return large results" do
# 2**16 = 64k is typical buffer size
long_string = 'a' * (2**16+1)
expect do
run_in_parallel(timeout=1) do
@result = method_with_param(long_string)
end
end.not_to raise_error
expect(@result).to eq "bar + #{long_string}"
end
it "should return a singleton class value" do
run_in_parallel { @result = get_singleton_class }
expect(@result.get_test_data).to eq([1, 2, 3])
end
it "should return an object with an instance variable set to an object containing singleton methods" do
run_in_parallel { @result = get_wrapper }
expect(@result.get_instance_var.get_test_data).to eq([1, 2, 3])
end
xit "should raise an exception and return immediately with kill_all_on_error and one of the processes errors." do
expect { run_in_parallel(nil, true) do
@result = get_singleton_class
@result_2 = raise_an_error
@result_3 = create_file_with_delay(TMP_FILE)
end }.to raise_error StandardError
expect(@result_3).to_not eq(true)
end
it "should raise an exception and let all processes complete when one of the processes errors." do
expect { run_in_parallel(nil, false) do
@result = get_singleton_class
@result_2 = raise_an_error
@result_3 = create_file_with_delay(TMP_FILE)
end }.to raise_error StandardError
expect(@result_3).to eq(true)
end
it "should not run in parallel if forking is not supported" do
InParallel::InParallelExecutor.class_variable_set(:@@supported, nil)
expect(Process).to receive(:respond_to?).with(:fork).and_return(false).once
expect(InParallel::InParallelExecutor.logger).to receive(:warn).with("Warning: Fork is not supported on this OS, executing block normally")
run_in_parallel do
@result_from_test = method_with_param('blah')
@result_2 = get_pid
end
expect(@result_from_test).to eq 'bar + blah'
expect(@result_2).to eq Process.pid
end
# it "should chunk stdout per process" do
# expect {run_in_parallel {
# simple_puts('foobar')
# }}.to output(/------ Begin output for simple_puts.*foobar.*------ Completed output for simple_puts/).to_stdout
# end
end
describe '.run_in_background' do
before do
File.delete(TMP_FILE) if File.exist?(TMP_FILE)
end
it 'should run in the background' do
run_in_background { @result = create_file_with_delay(TMP_FILE) }
start = Time.now
# Should not exist immediately upon block completion
expect(File.exist? TMP_FILE).to eq false
# Give this some time to complete since it takes longer on the vmpooler vms
file_exists = false
while Time.now < start + 10 do
if File.exist? TMP_FILE
file_exists = true
break
end
end
# Should exist once the delay in create_file_with_delay is done
expect(file_exists).to eq true
end
it 'should allow you to get results if ignore_results is false' do
@block_result = run_in_background(false) { @result = create_file_with_delay(TMP_FILE) }
wait_for_processes
# We should get the correct value assigned for the method result
expect(@result).to eq true
end
end
describe '.wait_for_processes' do
after do
InParallel::InParallelExecutor.parallel_default_timeout = 1200
end
it 'should timeout when the default timeout value is hit' do
@block_result = run_in_background(false) do
@result = create_file_with_delay(TMP_FILE, 30)
end
InParallel::InParallelExecutor.parallel_default_timeout = 0.1
expect { wait_for_processes }.to raise_error RuntimeError
end
it 'should timeout when a specified timeout value is hit' do
@block_result = run_in_background(false) do
@result = create_file_with_delay(TMP_FILE, 30)
@result2 = method_without_param
end
expect { wait_for_processes(0.1) }.to raise_error RuntimeError
end
end
describe '.each_in_parallel' do
it 'should run each iteration in a separate process' do
pids = [1, 2, 3].each_in_parallel { Process.pid }
expect(pids.detect { |pid| pids.count(pid) > 1 }).to be_nil
end
it 'should return correct values' do
start_time = Time.now
items = [1,2,3,4,5].each_in_parallel do |item|
sleep(Random.rand(1.0))
item * 2
end
# return values should be an array of the returned items in the last line of the block, in correct order
expect(items).to eq([2,4,6,8,10])
# time should be less than combined delay in the 3 block calls
expect(expect(Time.now - start_time).to be < 5)
end
it 'should run each iteration of a map in parallel' do
items = [1,2,3].map.each_in_parallel do |item|
puts item
item * 2
end
# return values should be an array of the returned items in the last line of the block, in correct order
expect(items).to eq([2,4,6])
end
it 'should return an empty array and do nothing with an empty enumerator' do
result = [].each_in_parallel do |item|
raise "Incorrectly called the block with an empty enumerator"
end
expect(result).to eq []
end
it 'should return the result of the block with only 1 item in the enumerator' do
expect([1].each_in_parallel do |item|
item * 2
end).to eq([2])
end
it 'should not run in parallel if there is only 1 item in the enumerator' do
expect(InParallel::InParallelExecutor.logger).to_not receive(:info).with(/Forked process for/)
expect(["foo"].map.each_in_parallel { Process.pid }[0]).to eq(Process.pid)
end
it 'should allow you to specify the method_sym' do
allow(InParallel::InParallelExecutor.logger).to receive(:info).with(anything())
expect(InParallel::InParallelExecutor.logger).to receive(:info).with(/Forked process for my_method/).exactly(3).times
[1, 2, 3].each_in_parallel('my_method') { |item|
puts item
}
end
end
|