1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
require 'helper'
require 'sidekiq/manager'
class TestManager < Sidekiq::Test
describe 'manager' do
before do
Sidekiq.redis = REDIS
Sidekiq.redis {|c| c.flushdb }
end
it 'creates N processor instances' do
mgr = Sidekiq::Manager.new(options)
assert_equal options[:concurrency], mgr.ready.size
assert_equal [], mgr.busy
end
it 'assigns work to a processor' do
uow = Object.new
processor = Minitest::Mock.new
processor.expect(:async, processor, [])
processor.expect(:process, nil, [uow])
mgr = Sidekiq::Manager.new(options)
mgr.ready << processor
mgr.assign(uow)
assert_equal 1, mgr.busy.size
processor.verify
end
it 'requeues work if stopping' do
uow = Minitest::Mock.new
uow.expect(:requeue, nil, [])
mgr = Sidekiq::Manager.new(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop
mgr.assign(uow)
uow.verify
end
it 'shuts down the system' do
mgr = Sidekiq::Manager.new(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop
assert mgr.busy.empty?
assert mgr.ready.empty?
end
it 'returns finished processors to the ready pool' do
fetcher = MiniTest::Mock.new
fetcher.expect :async, fetcher, []
fetcher.expect :fetch, nil, []
mgr = Sidekiq::Manager.new(options)
mgr.fetcher = fetcher
init_size = mgr.ready.size
processor = mgr.ready.pop
mgr.busy << processor
mgr.processor_done(processor)
assert_equal 0, mgr.busy.size
assert_equal init_size, mgr.ready.size
fetcher.verify
end
it 'throws away dead processors' do
fetcher = MiniTest::Mock.new
fetcher.expect :async, fetcher, []
fetcher.expect :fetch, nil, []
mgr = Sidekiq::Manager.new(options)
mgr.fetcher = fetcher
init_size = mgr.ready.size
processor = mgr.ready.pop
mgr.busy << processor
mgr.processor_died(processor, 'ignored')
assert_equal 0, mgr.busy.size
assert_equal init_size, mgr.ready.size
refute mgr.ready.include?(processor)
fetcher.verify
end
describe 'heartbeat' do
before do
uow = Object.new
@processor = Minitest::Mock.new
@processor.expect(:async, @processor, [])
@processor.expect(:process, nil, [uow])
@mgr = Sidekiq::Manager.new(options)
@mgr.ready << @processor
@mgr.assign(uow)
@processor.verify
@proctitle = $0
end
after do
$0 = @proctitle
end
describe 'when manager is active' do
before do
@mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
end
it 'sets useful info to proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["1"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 10
end
end
describe 'when manager is stopped' do
before do
@processor.expect(:alive?, [])
@processor.expect(:terminate, [])
@mgr.stop
@mgr.processor_done(@processor)
@mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
@processor.verify
end
it 'indicates stopping status in proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["0"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 50
end
end
end
def options
{ :concurrency => 3, :queues => ['default'] }
end
def heartbeat_data
{ 'concurrency' => 3, 'tag' => 'myapp' }
end
end
end
|