1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
require 'spec_helper'
require 'fetch_shared_examples'
require 'sidekiq/base_reliable_fetch'
require 'sidekiq/reliable_fetch'
require 'sidekiq/semi_reliable_fetch'
describe Sidekiq::BaseReliableFetch do
let(:job) { Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo']) }
before { Sidekiq.redis(&:flushdb) }
describe 'UnitOfWork' do
let(:fetcher) { Sidekiq::ReliableFetch.new(queues: ['foo']) }
describe '#requeue' do
it 'requeues job' do
Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }
uow = fetcher.retrieve_work
uow.requeue
expect(Sidekiq::Queue.new('foo').size).to eq 1
expect(working_queue_size('foo')).to eq 0
end
end
describe '#acknowledge' do
it 'acknowledges job' do
Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }
uow = fetcher.retrieve_work
expect { uow.acknowledge }
.to change { working_queue_size('foo') }.by(-1)
expect(Sidekiq::Queue.new('foo').size).to eq 0
end
end
end
describe '#bulk_requeue' do
let(:options) { { queues: %w[foo bar] } }
let!(:queue1) { Sidekiq::Queue.new('foo') }
let!(:queue2) { Sidekiq::Queue.new('bar') }
it 'requeues the bulk' do
uow = described_class::UnitOfWork
jobs = [ uow.new('queue:foo', job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
described_class.new(options).bulk_requeue(jobs, nil)
expect(queue1.size).to eq 2
expect(queue2.size).to eq 1
end
it 'puts jobs into interrupted queue' do
uow = described_class::UnitOfWork
interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
described_class.new(options).bulk_requeue(jobs, nil)
expect(queue1.size).to eq 1
expect(queue2.size).to eq 1
expect(Sidekiq::InterruptedSet.new.size).to eq 1
end
it 'does not put jobs into interrupted queue if it is disabled' do
options[:max_retries_after_interruption] = -1
uow = described_class::UnitOfWork
interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
described_class.new(options).bulk_requeue(jobs, nil)
expect(queue1.size).to eq 2
expect(queue2.size).to eq 1
expect(Sidekiq::InterruptedSet.new.size).to eq 0
end
end
it 'sets heartbeat' do
config = double(:sidekiq_config, options: { queues: %w[foo bar] })
heartbeat_thread = described_class.setup_reliable_fetch!(config)
Sidekiq.redis do |conn|
sleep 0.2 # Give the time to heartbeat thread to make a loop
heartbeat_key = described_class.heartbeat_key(described_class.identity)
heartbeat = conn.get(heartbeat_key)
expect(heartbeat).not_to be_nil
end
heartbeat_thread.kill
end
end
|