1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
# frozen_string_literal: true
require_relative 'helper'
require 'sidekiq/cli'
require 'sidekiq/fetch'
require 'sidekiq/scheduled'
require 'sidekiq/processor'
class TestActors < Sidekiq::Test
class JoeWorker
include Sidekiq::Worker
def perform(slp)
raise "boom" if slp == "boom"
sleep(slp) if slp > 0
$count += 1
end
end
describe 'threads' do
before do
Sidekiq.redis {|c| c.flushdb}
end
describe 'scheduler' do
it 'can start and stop' do
f = Sidekiq::Scheduled::Poller.new
f.start
f.terminate
end
it 'can schedule' do
ss = Sidekiq::ScheduledSet.new
q = Sidekiq::Queue.new
JoeWorker.perform_in(0.01, 0)
assert_equal 0, q.size
assert_equal 1, ss.size
sleep 0.015
s = Sidekiq::Scheduled::Poller.new
s.enqueue
assert_equal 1, q.size
assert_equal 0, ss.size
s.terminate
end
end
describe 'processor' do
before do
$count = 0
end
it 'can start and stop' do
f = Sidekiq::Processor.new(Mgr.new)
f.terminate
end
class Mgr
attr_reader :latest_error
attr_reader :mutex
attr_reader :cond
def initialize
@mutex = ::Mutex.new
@cond = ::ConditionVariable.new
end
def processor_died(inst, err)
@latest_error = err
@mutex.synchronize do
@cond.signal
end
end
def processor_stopped(inst)
@mutex.synchronize do
@cond.signal
end
end
def options
{ :concurrency => 3, :queues => ['default'] }
end
end
it 'can process' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
JoeWorker.perform_async(0)
a = $count
p.process_one
b = $count
assert_equal a + 1, b
end
it 'deals with errors' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
JoeWorker.perform_async("boom")
q = Sidekiq::Queue.new
assert_equal 1, q.size
a = $count
mgr.mutex.synchronize do
p.start
mgr.cond.wait(mgr.mutex)
end
b = $count
assert_equal a, b
sleep 0.001
assert_equal false, p.thread.status
p.terminate(true)
refute_nil mgr.latest_error
assert_equal RuntimeError, mgr.latest_error.class
end
it 'gracefully kills' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
JoeWorker.perform_async(1)
q = Sidekiq::Queue.new
assert_equal 1, q.size
a = $count
p.start
sleep(0.02)
p.terminate
p.kill(true)
b = $count
assert_equal a, b
assert_equal false, p.thread.status
refute mgr.latest_error, mgr.latest_error.to_s
end
end
end
end
|