File: base_reliable_fetch_spec.rb

package info (click to toggle)
ruby-gitlab-sidekiq-fetcher 0.9.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 204 kB
  • sloc: ruby: 760; makefile: 3
file content (97 lines) | stat: -rw-r--r-- 3,040 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
require 'spec_helper'
require 'fetch_shared_examples'
require 'sidekiq/base_reliable_fetch'
require 'sidekiq/reliable_fetch'
require 'sidekiq/semi_reliable_fetch'

describe Sidekiq::BaseReliableFetch do
  let(:job) { Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo']) }

  before { Sidekiq.redis(&:flushdb) }

  describe 'UnitOfWork' do
    let(:fetcher) { Sidekiq::ReliableFetch.new(queues: ['foo']) }

    describe '#requeue' do
      it 'requeues job' do
        Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }

        uow = fetcher.retrieve_work

        uow.requeue

        expect(Sidekiq::Queue.new('foo').size).to eq 1
        expect(working_queue_size('foo')).to eq 0
      end
    end

    describe '#acknowledge' do
      it 'acknowledges job' do
        Sidekiq.redis { |conn| conn.rpush('queue:foo', job) }

        uow = fetcher.retrieve_work

        expect { uow.acknowledge }
          .to change { working_queue_size('foo') }.by(-1)

        expect(Sidekiq::Queue.new('foo').size).to eq 0
      end
    end
  end

  describe '#bulk_requeue' do
    let(:options) { { queues: %w[foo bar] } }
    let!(:queue1) { Sidekiq::Queue.new('foo') }
    let!(:queue2) { Sidekiq::Queue.new('bar') }

    it 'requeues the bulk' do
      uow = described_class::UnitOfWork
      jobs = [ uow.new('queue:foo', job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
      described_class.new(options).bulk_requeue(jobs, nil)

      expect(queue1.size).to eq 2
      expect(queue2.size).to eq 1
    end

    it 'puts jobs into interrupted queue' do
      uow = described_class::UnitOfWork
      interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
      jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
      described_class.new(options).bulk_requeue(jobs, nil)

      expect(queue1.size).to eq 1
      expect(queue2.size).to eq 1
      expect(Sidekiq::InterruptedSet.new.size).to eq 1
    end

    it 'does not put jobs into interrupted queue if it is disabled' do
      options[:max_retries_after_interruption] = -1

      uow = described_class::UnitOfWork
      interrupted_job = Sidekiq.dump_json(class: 'Bob', args: [1, 2, 'foo'], interrupted_count: 3)
      jobs = [ uow.new('queue:foo', interrupted_job), uow.new('queue:foo', job), uow.new('queue:bar', job) ]
      described_class.new(options).bulk_requeue(jobs, nil)

      expect(queue1.size).to eq 2
      expect(queue2.size).to eq 1
      expect(Sidekiq::InterruptedSet.new.size).to eq 0
    end
  end

  it 'sets heartbeat' do
    config = double(:sidekiq_config, options: { queues: %w[foo bar] })

    heartbeat_thread = described_class.setup_reliable_fetch!(config)

    Sidekiq.redis do |conn|
      sleep 0.2 # Give the time to heartbeat thread to make a loop

      heartbeat_key = described_class.heartbeat_key(described_class.identity)
      heartbeat = conn.get(heartbeat_key)

      expect(heartbeat).not_to be_nil
    end

    heartbeat_thread.kill
  end
end