1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
|
require_relative 'thread_pool_executor_shared'
require 'concurrent/executor/thread_pool_executor'
module Concurrent
RSpec.describe RubyThreadPoolExecutor, :type=>:mri do
after(:each) do
subject.shutdown
expect(subject.wait_for_termination(pool_termination_timeout)).to eq true
end
subject do
RubyThreadPoolExecutor.new(
min_threads: 2,
max_threads: 5,
idletime: 60,
max_queue: 10,
fallback_policy: :discard
)
end
it_should_behave_like :thread_pool
it_should_behave_like :thread_pool_executor
context :prune, if: !Concurrent.on_jruby? do # pruning is flaky on JRuby
subject do
RubyThreadPoolExecutor.new(idletime: 5, min_threads: 2, max_threads: 10)
end
Group = Struct.new :waiting_threads, :threads, :mutex, :cond
def prepare_thread_group(size)
cond = ConditionVariable.new
mutex = Mutex.new
threads = []
size.times do
subject.post do
mutex.synchronize do
threads << Thread.current
cond.wait(mutex)
threads.delete(Thread.current)
end
end
end
eventually(mutex: mutex) { expect(threads).to have_attributes(size: size) }
Group.new(threads, threads.dup, mutex, cond)
end
def wakeup_thread_group(group)
group.cond.broadcast
eventually(mutex: group.mutex) do
expect(group.waiting_threads).to have_attributes(size: 0)
end
end
before(:each) do
@now = Concurrent.monotonic_time
allow(Concurrent).to receive(:monotonic_time) { @now }
@group1 = prepare_thread_group(5)
@group2 = prepare_thread_group(5)
end
def eventually(mutex: nil, timeout: 5, &block)
start = Time.now
while Time.now - start < timeout
begin
if mutex
mutex.synchronize do
return yield
end
else
return yield
end
rescue Exception => last_failure
end
Thread.pass
end
raise last_failure
end
it "triggers pruning when posting work if the last prune happened more than gc_interval ago" do
wakeup_thread_group(@group1)
@now += 6
wakeup_thread_group(@group2)
subject.post { }
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end
it "does not trigger pruning when posting work if the last prune happened less than gc_interval ago" do
wakeup_thread_group(@group1)
@now += 3
subject.prune_pool
@now += 3
wakeup_thread_group(@group2)
subject.post { }
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end
it "reclaims threads that have been idle for more than idletime seconds" do
wakeup_thread_group(@group1)
@now += 6
wakeup_thread_group(@group2)
subject.prune_pool
eventually { expect(@group1.threads).to all(have_attributes(status: false)) }
eventually { expect(@group2.threads).to all(have_attributes(status: 'sleep')) }
end
it "keeps at least min_length workers" do
wakeup_thread_group(@group1)
wakeup_thread_group(@group2)
@now += 12
subject.prune_pool
all_threads = @group1.threads + @group2.threads
eventually do
finished_threads = all_threads.find_all { |t| !t.status }
expect(finished_threads).to have_attributes(size: 8)
end
end
end
context '#remaining_capacity' do
let!(:expected_max){ 100 }
let(:latch) { Concurrent::CountDownLatch.new }
subject do
RubyThreadPoolExecutor.new(
min_threads: 10,
max_threads: 20,
idletime: 60,
max_queue: expected_max,
fallback_policy: :discard
)
end
it 'returns :max_length when no tasks are enqueued' do
5.times{ subject.post{ nil } }
subject.post { latch.count_down }
latch.wait(0.1)
expect(subject.remaining_capacity).to eq expected_max
end
it 'returns the remaining capacity when tasks are enqueued' do
block = Concurrent::CountDownLatch.new
100.times{ subject.post{ block.wait } }
subject.post { latch.count_down }
latch.wait(0.1)
expect(subject.remaining_capacity).to be < expected_max
block.count_down
end
end
context 'threads naming' do
subject do
opts = { min_threads: 2 }
opts[:name] = pool_name if pool_name
described_class.new(opts)
end
let(:names) do
require 'concurrent/set'
Concurrent::Set.new
end
before do
subject.post(names) { |names| names << Thread.current.name }
subject.post(names) { |names| names << Thread.current.name }
subject.shutdown
subject.wait_for_termination(pool_termination_timeout)
expect(names.size).to eq 2
end
context 'without pool name' do
let(:pool_name) { }
it 'sets counted name' do
expect(names.all? { |name| name =~ /^worker-\d+$/ }).to be true
end
end
context 'with pool name' do
let(:pool_name) { 'MyExecutor' }
it 'sets counted name' do
expect(names.all? { |name| name =~ /^MyExecutor-worker-\d+$/ }).to be true
end
end
end
end
end
|