File: test_processor.rb

package info (click to toggle)
ruby-sidekiq 4.2.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,064 kB
  • ctags: 754
  • sloc: ruby: 7,384; makefile: 26; sh: 4
file content (201 lines) | stat: -rw-r--r-- 5,787 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/fetch'
require 'sidekiq/cli'
require 'sidekiq/processor'

class TestProcessor < Sidekiq::Test
  TestException = Class.new(StandardError)
  TEST_EXCEPTION = TestException.new("kerboom!")

  describe 'processor' do
    before do
      $invokes = 0
      @mgr = Minitest::Mock.new
      @mgr.expect(:options, {:queues => ['default']})
      @mgr.expect(:options, {:queues => ['default']})
      @processor = ::Sidekiq::Processor.new(@mgr)
    end

    class MockWorker
      include Sidekiq::Worker
      def perform(args)
        raise TEST_EXCEPTION if args == 'boom'
        args.pop if args.is_a? Array
        $invokes += 1
      end
    end

    def work(msg, queue='queue:default')
      Sidekiq::BasicFetch::UnitOfWork.new(queue, msg)
    end

    it 'processes as expected' do
      msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
      @processor.process(work(msg))
      assert_equal 1, $invokes
    end

    it 'executes a worker as expected' do
      worker = Minitest::Mock.new
      worker.expect(:perform, nil, [1, 2, 3])
      @processor.execute_job(worker, [1, 2, 3])
    end

    it 're-raises exceptions after handling' do
      msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
      re_raise = false

      begin
        @processor.process(work(msg))
        flunk "Expected exception"
      rescue TestException
        re_raise = true
      end

      assert_equal 0, $invokes
      assert re_raise, "does not re-raise exceptions after handling"
    end

    it 'does not modify original arguments' do
      msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
      msgstr = Sidekiq.dump_json(msg)
      @mgr.expect(:processor_done, nil, [@processor])
      @processor.process(work(msgstr))
      assert_equal [['myarg']], msg['args']
    end

    describe 'acknowledgement' do
      class ExceptionRaisingMiddleware
        def initialize(raise_before_yield, raise_after_yield, skip)
          @raise_before_yield = raise_before_yield
          @raise_after_yield = raise_after_yield
          @skip = skip
        end

        def call(worker, item, queue)
          raise TEST_EXCEPTION if @raise_before_yield
          yield unless @skip
          raise TEST_EXCEPTION if @raise_after_yield
        end
      end

      let(:raise_before_yield) { false }
      let(:raise_after_yield) { false }
      let(:skip_job) { false }
      let(:worker_args) { ['myarg'] }
      let(:work) { MiniTest::Mock.new }

      before do
        work.expect(:queue_name, 'queue:default')
        work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
        Sidekiq.server_middleware do |chain|
          chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
        end
      end

      after do
        Sidekiq.server_middleware do |chain|
          chain.remove ExceptionRaisingMiddleware
        end
        work.verify
      end

      describe 'middleware throws an exception before processing the work' do
        let(:raise_before_yield) { true }

        it 'does not ack' do
          begin
            @processor.process(work)
            flunk "Expected #process to raise exception"
          rescue TestException
          end
        end
      end

      describe 'middleware throws an exception after processing the work' do
        let(:raise_after_yield) { true }

        it 'acks the job' do
          work.expect(:acknowledge, nil)
          begin
            @processor.process(work)
            flunk "Expected #process to raise exception"
          rescue TestException
          end
        end
      end

      describe 'middleware decides to skip work' do
        let(:skip_job) { true }

        it 'acks the job' do
          work.expect(:acknowledge, nil)
          @mgr.expect(:processor_done, nil, [@processor])
          @processor.process(work)
        end
      end

      describe 'worker raises an exception' do
        let(:worker_args) { ['boom'] }

        it 'acks the job' do
          work.expect(:acknowledge, nil)
          begin
            @processor.process(work)
            flunk "Expected #process to raise exception"
          rescue TestException
          end
        end
      end

      describe 'everything goes well' do
        it 'acks the job' do
          work.expect(:acknowledge, nil)
          @mgr.expect(:processor_done, nil, [@processor])
          @processor.process(work)
        end
      end
    end

    describe 'stats' do
      before do
        Sidekiq.redis {|c| c.flushdb }
      end

      describe 'when successful' do
        let(:processed_today_key) { "stat:processed:#{Time.now.utc.strftime("%Y-%m-%d")}" }

        def successful_job
          msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
          @mgr.expect(:processor_done, nil, [@processor])
          @processor.process(work(msg))
        end

        it 'increments processed stat' do
          Sidekiq::Processor::PROCESSED.value = 0
          successful_job
          assert_equal 1, Sidekiq::Processor::PROCESSED.value
        end
      end

      describe 'when failed' do
        let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }

        def failed_job
          msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
          begin
            @processor.process(work(msg))
          rescue TestException
          end
        end

        it 'increments failed stat' do
          Sidekiq::Processor::FAILURE.value = 0
          failed_job
          assert_equal 1, Sidekiq::Processor::FAILURE.value
        end
      end
    end
  end
end