File: test_actors.rb

package info (click to toggle)
ruby-sidekiq 4.2.3%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,064 kB
  • ctags: 754
  • sloc: ruby: 7,384; makefile: 26; sh: 4
file content (138 lines) | stat: -rw-r--r-- 2,979 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/cli'
require 'sidekiq/fetch'
require 'sidekiq/scheduled'
require 'sidekiq/processor'

class TestActors < Sidekiq::Test
  class JoeWorker
    include Sidekiq::Worker
    def perform(slp)
      raise "boom" if slp == "boom"
      sleep(slp) if slp > 0
      $count += 1
    end
  end

  describe 'threads' do
    before do
      Sidekiq.redis {|c| c.flushdb}
    end

    describe 'scheduler' do
      it 'can start and stop' do
        f = Sidekiq::Scheduled::Poller.new
        f.start
        f.terminate
      end

      it 'can schedule' do
        ss = Sidekiq::ScheduledSet.new
        q = Sidekiq::Queue.new

        JoeWorker.perform_in(0.01, 0)

        assert_equal 0, q.size
        assert_equal 1, ss.size

        sleep 0.015
        s = Sidekiq::Scheduled::Poller.new
        s.enqueue
        assert_equal 1, q.size
        assert_equal 0, ss.size
        s.terminate
      end
    end

    describe 'processor' do
      before do
        $count = 0
      end

      it 'can start and stop' do
        f = Sidekiq::Processor.new(Mgr.new)
        f.terminate
      end

      class Mgr
        attr_reader :latest_error
        attr_reader :mutex
        attr_reader :cond
        def initialize
          @mutex = ::Mutex.new
          @cond = ::ConditionVariable.new
        end
        def processor_died(inst, err)
          @latest_error = err
          @mutex.synchronize do
            @cond.signal
          end
        end
        def processor_stopped(inst)
          @mutex.synchronize do
            @cond.signal
          end
        end
        def options
          { :concurrency => 3, :queues => ['default'] }
        end
      end

      it 'can process' do
        mgr = Mgr.new

        p = Sidekiq::Processor.new(mgr)
        JoeWorker.perform_async(0)

        a = $count
        p.process_one
        b = $count
        assert_equal a + 1, b
      end

      it 'deals with errors' do
        mgr = Mgr.new

        p = Sidekiq::Processor.new(mgr)
        JoeWorker.perform_async("boom")
        q = Sidekiq::Queue.new
        assert_equal 1, q.size

        a = $count
        mgr.mutex.synchronize do
          p.start
          mgr.cond.wait(mgr.mutex)
        end
        b = $count
        assert_equal a, b

        sleep 0.001
        assert_equal false, p.thread.status
        p.terminate(true)
        refute_nil mgr.latest_error
        assert_equal RuntimeError, mgr.latest_error.class
      end

      it 'gracefully kills' do
        mgr = Mgr.new

        p = Sidekiq::Processor.new(mgr)
        JoeWorker.perform_async(1)
        q = Sidekiq::Queue.new
        assert_equal 1, q.size

        a = $count
        p.start
        sleep(0.02)
        p.terminate
        p.kill(true)

        b = $count
        assert_equal a, b
        assert_equal false, p.thread.status
        refute mgr.latest_error, mgr.latest_error.to_s
      end
    end
  end
end