1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
# frozen_string_literal: true
require 'sidekiq/client'
require 'sidekiq/core_ext'
module Sidekiq
##
# Include this module in your worker class and you can easily create
# asynchronous jobs:
#
# class HardWorker
# include Sidekiq::Worker
#
# def perform(*args)
# # do some work
# end
# end
#
# Then in your Rails app, you can do this:
#
# HardWorker.perform_async(1, 2, 3)
#
# Note that perform_async is a class method, perform is an instance method.
module Worker
attr_accessor :jid
def self.included(base)
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? {|c| c.name == 'ActiveJob::Base' }
base.extend(ClassMethods)
base.class_attribute :sidekiq_options_hash
base.class_attribute :sidekiq_retry_in_block
base.class_attribute :sidekiq_retries_exhausted_block
end
def logger
Sidekiq.logger
end
module ClassMethods
def delay(*args)
raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end
def delay_for(*args)
raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end
def delay_until(*args)
raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end
def set(options)
Thread.current[:sidekiq_worker_set] = options
self
end
def perform_async(*args)
client_push('class' => self, 'args' => args)
end
# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)
item = { 'class' => self, 'args' => args, 'at' => ts }
# Optimization to enqueue something now that is scheduled to go out now or in the past
item.delete('at'.freeze) if ts <= now
client_push(item)
end
alias_method :perform_at, :perform_in
##
# Allows customization for this type of Worker.
# Legal options:
#
# queue - use a named queue for this Worker, default 'default'
# retry - enable the RetryJobs middleware for this Worker, *true* to use the default
# or *Integer* count
# backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
# pool - use the given Redis connection pool to push this type of job to a given shard.
#
# In practice, any option is allowed. This is the main mechanism to configure the
# options for a specific job.
def sidekiq_options(opts={})
self.sidekiq_options_hash = get_sidekiq_options.merge(opts.stringify_keys)
end
def sidekiq_retry_in(&block)
self.sidekiq_retry_in_block = block
end
def sidekiq_retries_exhausted(&block)
self.sidekiq_retries_exhausted_block = block
end
def get_sidekiq_options # :nodoc:
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
hash = if Thread.current[:sidekiq_worker_set]
x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil
x.stringify_keys.merge(item.stringify_keys)
else
item.stringify_keys
end
Sidekiq::Client.new(pool).push(hash)
end
end
end
end
|