1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
|
# frozen_string_literal: true
module EachBatch
extend ActiveSupport::Concern
include LooseIndexScan
class_methods do
# Iterates over the rows in a relation in batches, similar to Rails'
# `in_batches` but in a more efficient way.
#
# Unlike `in_batches` provided by Rails this method does not support a
# custom start/end range, nor does it provide support for the `load:`
# keyword argument.
#
# This method will yield an ActiveRecord::Relation to the supplied block, or
# return an Enumerator if no block is given.
#
# Example:
#
# User.each_batch do |relation|
# relation.update_all(updated_at: Time.current)
# end
#
# The supplied block is also passed an optional batch index:
#
# User.each_batch do |relation, index|
# puts index # => 1, 2, 3, ...
# end
#
# You can also specify an alternative column to use for ordering the rows:
#
# User.each_batch(column: :created_at) do |relation|
# ...
# end
#
# This will produce SQL queries along the lines of:
#
# User Load (0.7ms) SELECT "users"."id" FROM "users" WHERE ("users"."id" >= 41654)
# ORDER BY "users"."id" ASC LIMIT 1 OFFSET 1000
# (0.7ms) SELECT COUNT(*) FROM "users" WHERE ("users"."id" >= 41654) AND ("users"."id" < 42687)
#
# of - The number of rows to retrieve per batch.
# column - The column to use for ordering the batches.
# order_hint - An optional column to append to the `ORDER BY id`
# clause to help the query planner. PostgreSQL might perform badly
# with a LIMIT 1 because the planner is guessing that scanning the
# index in ID order will come across the desired row in less time
# it will take the planner than using another index. The
# order_hint does not affect the search results. For example,
# `ORDER BY id ASC, updated_at ASC` means the same thing as `ORDER
# BY id ASC`.
def each_batch(of: 1000, column: primary_key, order: :asc, order_hint: nil, reset_order: true)
unless column
raise ArgumentError,
'the column: argument must be set to a column name to use for ordering rows'
end
order_with = reset_order ? :reorder : :order
start = except(:select, :includes, :preload)
.select(column)
.method(order_with).call(column => order)
start = start.order(order_hint) if order_hint
start = start.take
return unless start
start_id = start[column]
arel_table = self.arel_table
1.step do |index|
start_cond = arel_table[column].gteq(start_id)
start_cond = arel_table[column].lteq(start_id) if order == :desc
stop = except(:select, :includes, :preload)
.select(column)
.where(start_cond)
.method(order_with).call(column => order)
stop = stop.order(order_hint) if order_hint
stop = stop
.offset(of)
.limit(1)
.take
relation = where(start_cond)
if stop
stop_id = stop[column]
start_id = stop_id
stop_cond = arel_table[column].lt(stop_id)
stop_cond = arel_table[column].gt(stop_id) if order == :desc
relation = relation.where(stop_cond)
end
# Any ORDER BYs are useless for this relation and can lead to less
# efficient UPDATE queries, hence we get rid of it.
relation = relation.except(:order)
# Using unscoped is necessary to prevent leaking the current scope used by
# ActiveRecord to chain `each_batch` method.
unscoped { yield relation, index }
break unless stop
end
end
# Iterates over the rows in a relation in batches by skipping duplicated values in the column.
# Example: counting the number of distinct authors in `issues`
#
# - Table size: 100_000
# - Column: author_id
# - Distinct author_ids in the table: 1000
#
# The query will read maximum 1000 rows if we have index coverage on user_id.
#
# > count = 0
# > Issue.distinct_each_batch(column: 'author_id', of: 1000) { |r| count += r.count(:author_id) }
def distinct_each_batch(column:, order: :asc, of: 1000)
start = except(:select)
.select(column)
.reorder(column => order)
start = start.take
return unless start
start_id = start[column]
arel_table = self.arel_table
arel_column = arel_table[column.to_s]
1.step do |index|
stop = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
if order == :asc
[cte_query.where(arel_column.gteq(start_id)), inner_query]
else
[cte_query.where(arel_column.lteq(start_id)), inner_query]
end
end.offset(of).take
if stop
stop_id = stop[column]
relation = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
if order == :asc
[cte_query.where(arel_column.gteq(start_id)), inner_query.where(arel_column.lt(stop_id))]
else
[cte_query.where(arel_column.lteq(start_id)), inner_query.where(arel_column.gt(stop_id))]
end
end
start_id = stop_id
else
relation = loose_index_scan(column: column, order: order) do |cte_query, inner_query|
if order == :asc
[cte_query.where(arel_column.gteq(start_id)), inner_query]
else
[cte_query.where(arel_column.lteq(start_id)), inner_query]
end
end
end
unscoped { yield relation, index }
break unless stop
end
end
# Iterates over the relation and counts the rows. The counting
# logic is combined with the iteration query which saves one query
# compared to a standard each_batch approach.
#
# Basic usage:
# count, _last_value = Project.each_batch_count
#
# The counting can be stopped by passing a block and making the last statement true.
# Example:
#
# query_count = 0
# count, last_value = Project.each_batch_count do
# query_count += 1
# query_count == 5 # stop counting after 5 loops
# end
#
# Resume where the previous counting has stopped:
#
# count, last_value = Project.each_batch_count(last_count: count, last_value: last_value)
#
# Another example, counting issues in project:
#
# project = Project.find(1)
# count, _ = project.issues.each_batch_count(column: :iid)
def each_batch_count(of: 1000, column: :id, last_count: 0, last_value: nil)
arel_table = self.arel_table
window = Arel::Nodes::Window.new.order(arel_table[column])
last_value_column = Arel::Nodes::NamedFunction
.new('LAST_VALUE', [arel_table[column]])
.over(window)
.as(column.to_s)
loop do
count_column = Arel::Nodes::Addition
.new(Arel::Nodes::NamedFunction.new('ROW_NUMBER', []).over(window), last_count)
.as('count')
projections = [count_column, last_value_column]
scope = limit(1).offset(of - 1)
scope = scope.where(arel_table[column].gt(last_value)) if last_value
new_count, last_value = scope.pick(*projections)
# When reaching the last batch the offset query might return no data, to address this
# problem, we invoke a specialized query that takes the last row out of the resultset.
# We could do this for each batch, however it would add unnecessary overhead to all
# queries.
if new_count.nil?
inner_query = scope
.select(*projections)
.limit(nil)
.offset(nil)
.arel
.as(quoted_table_name)
new_count, last_value =
unscoped
.from(inner_query)
.unscope(where: :type)
.order(count: :desc)
.limit(1)
.pick(:count, column)
last_count = new_count if new_count
last_value = nil
break
end
last_count = new_count
if block_given?
should_break = yield(last_count, last_value)
break if should_break
end
end
[last_count, last_value]
end
end
end
|