1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
|
require 'sidekiq/api'
module Sidekiq
class InterruptedSet < ::Sidekiq::JobSet
DEFAULT_MAX_CAPACITY = 10_000
DEFAULT_MAX_TIMEOUT = 90 * 24 * 60 * 60 # 3 months
def initialize
super "interrupted"
end
def put(message, opts = {})
now = Time.now.to_f
with_multi_connection(opts[:connection]) do |conn|
conn.zadd(name, now.to_s, message)
conn.zremrangebyscore(name, '-inf', now - self.class.timeout)
conn.zremrangebyrank(name, 0, - self.class.max_jobs)
end
true
end
# Yield block inside an existing multi connection or creates new one
def with_multi_connection(conn, &block)
return yield(conn) if conn
Sidekiq.redis do |c|
c.multi do |multi|
yield(multi)
end
end
end
def retry_all
each(&:retry) while size > 0
end
def self.max_jobs
options[:interrupted_max_jobs] || DEFAULT_MAX_CAPACITY
end
def self.timeout
options[:interrupted_timeout_in_seconds] || DEFAULT_MAX_TIMEOUT
end
def self.options
Sidekiq.respond_to?(:[]) ? Sidekiq : Sidekiq.options
end
end
end
|