File: active_record.rb

package info (click to toggle)
ruby-delayed-job-active-record 4.1.11-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 460 kB
  • sloc: ruby: 459; makefile: 6
file content (213 lines) | stat: -rw-r--r-- 8,831 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# frozen_string_literal: true

require "active_record/version"
module Delayed
  module Backend
    module ActiveRecord
      class Configuration
        attr_reader :reserve_sql_strategy

        def initialize
          self.reserve_sql_strategy = :optimized_sql
        end

        def reserve_sql_strategy=(val)
          if !(val == :optimized_sql || val == :default_sql)
            raise ArgumentError, "allowed values are :optimized_sql or :default_sql"
          end

          @reserve_sql_strategy = val
        end
      end

      def self.configuration
        @configuration ||= Configuration.new
      end

      def self.configure
        yield(configuration)
      end

      # A job object that is persisted to the database.
      # Contains the work object as a YAML field.
      class Job < ::ActiveRecord::Base
        include Delayed::Backend::Base

        if ::ActiveRecord::VERSION::MAJOR < 4 || defined?(::ActiveRecord::MassAssignmentSecurity)
          attr_accessible :priority, :run_at, :queue, :payload_object,
                          :failed_at, :locked_at, :locked_by, :handler
        end

        scope :by_priority, lambda { order("priority ASC, run_at ASC") }
        scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
        scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
        scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }

        before_save :set_default_run_at

        def self.set_delayed_job_table_name
          delayed_job_table_name = "#{::ActiveRecord::Base.table_name_prefix}delayed_jobs"
          self.table_name = delayed_job_table_name
        end

        set_delayed_job_table_name

        def self.ready_to_run(worker_name, max_run_time)
          where(
            "((run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL",
            db_time_now,
            db_time_now - max_run_time,
            worker_name
          )
        end

        def self.before_fork
          if Gem::Version.new("7.1.0") <= Gem::Version.new(::ActiveRecord::VERSION::STRING)
            ::ActiveRecord::Base.connection_handler.clear_all_connections!(:all)
          else
            ::ActiveRecord::Base.connection_handler.clear_all_connections!
          end
        end

        def self.after_fork
          ::ActiveRecord::Base.establish_connection
        end

        # When a worker is exiting, make sure we don't have any locked jobs.
        def self.clear_locks!(worker_name)
          where(locked_by: worker_name).update_all(locked_by: nil, locked_at: nil)
        end

        def self.reserve(worker, max_run_time = Worker.max_run_time)
          ready_scope =
            ready_to_run(worker.name, max_run_time)
            .min_priority
            .max_priority
            .for_queues
            .by_priority

          reserve_with_scope(ready_scope, worker, db_time_now)
        end

        def self.reserve_with_scope(ready_scope, worker, now)
          case Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy
          # Optimizations for faster lookups on some common databases
          when :optimized_sql
            reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
          # Slower but in some cases more unproblematic strategy to lookup records
          # See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
          when :default_sql
            reserve_with_scope_using_default_sql(ready_scope, worker, now)
          end
        end

        def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
          case connection.adapter_name
          when "PostgreSQL", "PostGIS"
            reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
          when "MySQL", "Mysql2", "Trilogy"
            reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
          when "MSSQL", "Teradata"
            reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
          # Fallback for unknown / other DBMS
          else
            reserve_with_scope_using_default_sql(ready_scope, worker, now)
          end
        end

        def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
          # This is our old fashion, tried and true, but possibly slower lookup
          # Instead of reading the entire job record for our detect loop, we select only the id,
          # and only read the full job record after we've successfully locked the job.
          # This can have a noticeable impact on large read_ahead configurations and large payload jobs.
          ready_scope.limit(worker.read_ahead).select(:id).detect do |job|
            count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
            count == 1 && job.reload
          end
        end

        def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
          # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
          # This locks the single record 'FOR UPDATE' in the subquery
          # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
          # Note: active_record would attempt to generate UPDATE...LIMIT like
          # SQL for Postgres if we use a .limit() filter, but it would not
          # use 'FOR UPDATE' and we would have many locking conflicts
          subquery = ready_scope.limit(1).lock(true).select("id").to_sql

          # On PostgreSQL >= 9.5 we leverage SKIP LOCK to avoid multiple workers blocking each other
          # when attempting to get the next available job
          # https://www.postgresql.org/docs/9.5/sql-select.html#SQL-FOR-UPDATE-SHARE
          if connection.send(:postgresql_version) >= 9_05_00 # rubocop:disable Style/NumericLiterals
            subquery += " SKIP LOCKED"
          end

          quoted_name = connection.quote_table_name(table_name)
          find_by_sql(
            [
              "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *",
              now,
              worker.name
            ]
          ).first
        end

        def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
          # Removing the millisecond precision from now(time object)
          # MySQL 5.6.4 onwards millisecond precision exists, but the
          # datetime object created doesn't have precision, so discarded
          # while updating. But during the where clause, for mysql(>=5.6.4),
          # it queries with precision as well. So removing the precision
          now = now.change(usec: 0)
          # This works on MySQL and possibly some other DBs that support
          # UPDATE...LIMIT. It uses separate queries to lock and return the job
          count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
          return nil if count == 0

          where(locked_at: now, locked_by: worker.name, failed_at: nil).first
        end

        def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
          # The MSSQL driver doesn't generate a limit clause when update_all
          # is called directly
          subsubquery_sql = ready_scope.limit(1).to_sql
          # select("id") doesn't generate a subquery, so force a subquery
          subquery_sql = "SELECT id FROM (#{subsubquery_sql}) AS x"
          quoted_table_name = connection.quote_table_name(table_name)
          sql = "UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql})"
          count = connection.execute(sanitize_sql([sql, now, worker.name]))
          return nil if count == 0

          # MSSQL JDBC doesn't support OUTPUT INSERTED.* for returning a result set, so query locked row
          where(locked_at: now, locked_by: worker.name, failed_at: nil).first
        end

        # Get the current time (GMT or local depending on DB)
        # Note: This does not ping the DB to get the time, so all your clients
        # must have synchronized clocks.
        def self.db_time_now
          if Time.zone
            Time.zone.now
          elsif default_timezone == :utc
            Time.now.utc
          else
            Time.now # rubocop:disable Rails/TimeZone
          end
        end

        def self.default_timezone
          if ::ActiveRecord.respond_to?(:default_timezone)
            ::ActiveRecord.default_timezone
          else
            ::ActiveRecord::Base.default_timezone
          end
        end

        def reload(*args)
          reset
          super
        end
      end
    end
  end
end