1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
module Delayed
module Backend
module Base
def self.included(base)
base.extend ClassMethods
end
module ClassMethods
# Add a job to the queue
def enqueue(*args)
job_options = Delayed::Backend::JobPreparer.new(*args).prepare
enqueue_job(job_options)
end
def enqueue_job(options)
new(options).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
end
end
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(worker.name, worker.read_ahead, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
end
end
# Allow the backend to attempt recovery from reserve errors
def recover_from(_error); end
# Hook method that is called before a new worker is forked
def before_fork; end
# Hook method that is called after a new worker is forked
def after_fork; end
def work_off(num = 100)
warn '[DEPRECATION] `Delayed::Job.work_off` is deprecated. Use `Delayed::Worker.new.work_off instead.'
Delayed::Worker.new.work_off(num)
end
end
attr_reader :error
def error=(error)
@error = error
self.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" if respond_to?(:last_error=)
end
def failed?
!!failed_at
end
alias_method :failed, :failed?
ParseObjectFromYaml = %r{\!ruby/\w+\:([^\s]+)} # rubocop:disable ConstantName
def name
@name ||= payload_object.respond_to?(:display_name) ? payload_object.display_name : payload_object.class.name
rescue DeserializationError
ParseObjectFromYaml.match(handler)[1]
end
def payload_object=(object)
@payload_object = object
self.handler = object.to_yaml
end
def payload_object
@payload_object ||= YAML.load_dj(handler)
rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
raise DeserializationError, "Job failed to load: #{e.message}. Handler: #{handler.inspect}"
end
def invoke_job
Delayed::Worker.lifecycle.run_callbacks(:invoke_job, self) do
begin
hook :before
payload_object.perform
hook :success
rescue Exception => e # rubocop:disable RescueException
hook :error, e
raise e
ensure
hook :after
end
end
end
# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end
def hook(name, *args)
if payload_object.respond_to?(name)
method = payload_object.method(name)
method.arity.zero? ? method.call : method.call(self, *args)
end
rescue DeserializationError # rubocop:disable HandleExceptions
end
def reschedule_at
if payload_object.respond_to?(:reschedule_at)
payload_object.reschedule_at(self.class.db_time_now, attempts)
else
self.class.db_time_now + (attempts**4) + 5
end
end
def max_attempts
payload_object.max_attempts if payload_object.respond_to?(:max_attempts)
end
def max_run_time
return unless payload_object.respond_to?(:max_run_time)
return unless (run_time = payload_object.max_run_time)
if run_time > Delayed::Worker.max_run_time
Delayed::Worker.max_run_time
else
run_time
end
end
def destroy_failed_jobs?
payload_object.respond_to?(:destroy_failed_jobs?) ? payload_object.destroy_failed_jobs? : Delayed::Worker.destroy_failed_jobs
rescue DeserializationError
Delayed::Worker.destroy_failed_jobs
end
def fail!
self.failed_at = self.class.db_time_now
save!
end
protected
def set_default_run_at
self.run_at ||= self.class.db_time_now
end
# Call during reload operation to clear out internal state
def reset
@payload_object = nil
end
end
end
end
|