1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
|
require 'helper'
require 'sidekiq/processor'
class TestProcessor < Sidekiq::Test
TestException = Class.new(StandardError)
TEST_EXCEPTION = TestException.new("kerboom!")
describe 'with mock setup' do
before do
$invokes = 0
@boss = Minitest::Mock.new
@processor = ::Sidekiq::Processor.new(@boss)
Celluloid.logger = nil
Sidekiq.redis = REDIS
end
class MockWorker
include Sidekiq::Worker
def perform(args)
raise TEST_EXCEPTION if args == 'boom'
args.pop if args.is_a? Array
$invokes += 1
end
end
def work(msg, queue='queue:default')
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
end
it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [@processor])
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg))
@boss.verify
assert_equal 1, $invokes
end
it 'executes a worker as expected' do
worker = Minitest::Mock.new
worker.expect(:perform, nil, [1, 2, 3])
@processor.execute_job(worker, [1, 2, 3])
end
it 'passes exceptions to ExceptionHandler' do
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
flunk "Expected #process to raise exception"
rescue TestException
end
assert_equal 0, $invokes
end
it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
begin
@processor.process(work(msg))
rescue TestException
re_raise = true
end
assert re_raise, "does not re-raise exceptions after handling"
end
it 'does not modify original arguments' do
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
msgstr = Sidekiq.dump_json(msg)
processor = ::Sidekiq::Processor.new(@boss)
actor = Minitest::Mock.new
actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
processor.process(work(msgstr))
assert_equal [['myarg']], msg['args']
end
describe 'stats' do
before do
Sidekiq.redis {|c| c.flushdb }
end
def with_expire(time)
begin
old = Sidekiq::Processor::STATS_TIMEOUT
silence_warnings { Sidekiq::Processor.const_set(:STATS_TIMEOUT, time) }
yield
ensure
silence_warnings { Sidekiq::Processor.const_set(:STATS_TIMEOUT, old) }
end
end
describe 'when successful' do
let(:processed_today_key) { "stat:processed:#{Time.now.utc.to_date}" }
def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg))
end
it 'increments processed stat' do
successful_job
assert_equal 1, Sidekiq::Stats.new.processed
end
it 'expires processed stat' do
successful_job
assert_equal Sidekiq::Processor::STATS_TIMEOUT, Sidekiq.redis { |conn| conn.ttl(processed_today_key) }
end
it 'increments date processed stat' do
successful_job
assert_equal 1, Sidekiq.redis { |conn| conn.get(processed_today_key) }.to_i
end
end
describe 'when failed' do
let(:failed_today_key) { "stat:failed:#{Time.now.utc.to_date}" }
def failed_job
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
rescue TestException
end
end
it 'increments failed stat' do
failed_job
assert_equal 1, Sidekiq::Stats.new.failed
end
it 'increments date failed stat' do
failed_job
assert_equal 1, Sidekiq.redis { |conn| conn.get(failed_today_key) }.to_i
end
it 'expires failed stat' do
failed_job
assert_equal Sidekiq::Processor::STATS_TIMEOUT, Sidekiq.redis { |conn| conn.ttl(failed_today_key) }
end
end
end
end
end
|