1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
|
# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/fetch'
require 'sidekiq/cli'
require 'sidekiq/processor'
class TestProcessor < Sidekiq::Test
TestException = Class.new(StandardError)
TEST_EXCEPTION = TestException.new("kerboom!")
describe 'processor' do
before do
$invokes = 0
@mgr = Minitest::Mock.new
@mgr.expect(:options, {:queues => ['default']})
@mgr.expect(:options, {:queues => ['default']})
@processor = ::Sidekiq::Processor.new(@mgr)
end
class MockWorker
include Sidekiq::Worker
def perform(args)
raise TEST_EXCEPTION if args == 'boom'
args.pop if args.is_a? Array
$invokes += 1
end
end
def work(msg, queue='queue:default')
Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
end
it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@processor.process(work(msg))
assert_equal 1, $invokes
end
it 'executes a worker as expected' do
worker = Minitest::Mock.new
worker.expect(:perform, nil, [1, 2, 3])
@processor.execute_job(worker, [1, 2, 3])
end
it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false
begin
@processor.process(work(msg))
flunk "Expected exception"
rescue TestException
re_raise = true
end
assert_equal 0, $invokes
assert re_raise, "does not re-raise exceptions after handling"
end
it 'does not modify original arguments' do
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
msgstr = Sidekiq.dump_json(msg)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work(msgstr))
assert_equal [['myarg']], msg['args']
end
describe 'acknowledgement' do
class ExceptionRaisingMiddleware
def initialize(raise_before_yield, raise_after_yield, skip)
@raise_before_yield = raise_before_yield
@raise_after_yield = raise_after_yield
@skip = skip
end
def call(worker, item, queue)
raise TEST_EXCEPTION if @raise_before_yield
yield unless @skip
raise TEST_EXCEPTION if @raise_after_yield
end
end
let(:raise_before_yield) { false }
let(:raise_after_yield) { false }
let(:skip_job) { false }
let(:worker_args) { ['myarg'] }
let(:work) { MiniTest::Mock.new }
before do
work.expect(:queue_name, 'queue:default')
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
Sidekiq.server_middleware do |chain|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
end
end
after do
Sidekiq.server_middleware do |chain|
chain.remove ExceptionRaisingMiddleware
end
work.verify
end
describe 'middleware throws an exception before processing the work' do
let(:raise_before_yield) { true }
it 'does not ack' do
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'middleware throws an exception after processing the work' do
let(:raise_after_yield) { true }
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'middleware decides to skip work' do
let(:skip_job) { true }
it 'acks the job' do
work.expect(:acknowledge, nil)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work)
end
end
describe 'worker raises an exception' do
let(:worker_args) { ['boom'] }
it 'acks the job' do
work.expect(:acknowledge, nil)
begin
@processor.process(work)
flunk "Expected #process to raise exception"
rescue TestException
end
end
end
describe 'everything goes well' do
it 'acks the job' do
work.expect(:acknowledge, nil)
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work)
end
end
end
describe 'stats' do
before do
Sidekiq.redis {|c| c.flushdb }
end
describe 'when successful' do
let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
@mgr.expect(:processor_done, nil, [@processor])
@processor.process(work(msg))
end
it 'increments processed stat' do
Sidekiq::Processor::PROCESSED.value = 0
successful_job
assert_equal 1, Sidekiq::Processor::PROCESSED.value
end
end
describe 'when failed' do
let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def failed_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
rescue TestException
end
end
it 'increments failed stat' do
Sidekiq::Processor::FAILURE.value = 0
failed_job
assert_equal 1, Sidekiq::Processor::FAILURE.value
end
end
end
end
end
|