1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
# frozen_string_literal: true
require "active_record/version"
module Delayed
module Backend
module ActiveRecord
class Configuration
attr_reader :reserve_sql_strategy
def initialize
self.reserve_sql_strategy = :optimized_sql
end
def reserve_sql_strategy=(val)
if !(val == :optimized_sql || val == :default_sql)
raise ArgumentError, "allowed values are :optimized_sql or :default_sql"
end
@reserve_sql_strategy = val
end
end
def self.configuration
@configuration ||= Configuration.new
end
def self.configure
yield(configuration)
end
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity)
attr_accessible :priority, :run_at, :queue, :payload_object,
:failed_at, :locked_at, :locked_by, :handler
end
scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }
before_save :set_default_run_at
def self.set_delayed_job_table_name
delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
self.table_name = delayed_job_table_name
end
set_delayed_job_table_name
def self.ready_to_run(worker_name, max_run_time)
where(
"((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
end
def self.before_fork
if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING)
::ActiveRecord::Base.connection_handler.clear_all_connections!(:all)
else
::ActiveRecord::Base.connection_handler.clear_all_connections!
end
end
def self.after_fork
::ActiveRecord::Base.establish_connection
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
end
def self.reserve(worker, max_run_time = Worker.max_run_time)
ready_scope =
ready_to_run(worker.name, max_run_time)
.min_priority
.max_priority
.for_queues
.by_priority
reserve_with_scope(ready_scope, worker, db_time_now)
end
def self.reserve_with_scope(ready_scope, worker, now)
case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
# Optimizations for faster lookups on some common databases
when :optimized_sql
reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
# Slower but in some cases more unproblematic strategy to lookup records
# See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
when :default_sql
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
case connection.adapter_name
when "PostgreSQL", "PostGIS"
reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
when "MySQL", "Mysql2", "Trilogy"
reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
when "MSSQL", "Teradata"
reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# Fallback for unknown / other DBMS
else
reserve_with_scope_using_default_sql(ready_scope, worker, now)
end
end
def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
# This is our old fashion, tried and true, but possibly slower lookup
# Instead of reading the entire job record for our detect loop, we select only the id,
# and only read the full job record after we've successfully locked the job.
# This can have a noticeable impact on large read_ahead configurations and large payload jobs.
ready_scope.limit(worker.read_ahead).select(:id).detect do |job|
count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
count == 1 && job.reload
end
end
def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
# This locks the single record 'FOR UPDATE' in the subquery
# http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
subquery = ready_scope.limit(1).lock(true).select("id").to_sql
# On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
# when attempting to get the next available job
# https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
subquery += " SKIP LOCKED"
end
quoted_name = connection.quote_table_name(table_name)
find_by_sql(
[
"UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
now,
worker.name
]
).first
end
def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
# Removing the millisecond precision from now(time object)
# MySQL 5.6.4 onwards millisecond precision exists, but the
# datetime object created doesn't have precision, so discarded
# while updating. But during the where clause, for mysql(>=5.6.4),
# it queries with precision as well. So removing the precision
now = now.change(usec: 0)
# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
return nil if count == 0
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end
def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
# The MSSQL driver doesn't generate a limit clause when update_all
# is called directly
subsubquery_sql = ready_scope.limit(1).to_sql
# select("id") doesn't generate a subquery, so force a subquery
subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
quoted_table_name = connection.quote_table_name(table_name)
sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
count = connection.execute(sanitize_sql([sql, now, worker.name]))
return nil if count == 0
# MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have synchronized clocks.
def self.db_time_now
if Time.zone
Time.zone.now
elsif default_timezone == :utc
Time.now.utc
else
Time.now # rubocop:disable Rails/TimeZone
end
end
def self.default_timezone
if ::ActiveRecord.respond_to?(:default_timezone)
::ActiveRecord.default_timezone
else
::ActiveRecord::Base.default_timezone
end
end
def reload(*args)
reset
super
end
end
end
end
end
|