1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
# frozen_string_literal: true
module ActiveRecord
class InsertAll # :nodoc:
attr_reader :model, :connection, :inserts, :keys
attr_reader :on_duplicate, :returning, :unique_by
def initialize(model, inserts, on_duplicate:, returning: nil, unique_by: nil)
raise ArgumentError, "Empty list of attributes passed" if inserts.blank?
@model, @connection, @inserts, @keys = model, model.connection, inserts, inserts.first.keys.map(&:to_s).to_set
@on_duplicate, @returning, @unique_by = on_duplicate, returning, unique_by
@returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil?
@returning = false if @returning == []
@unique_by = find_unique_index_for(unique_by) if unique_by
@on_duplicate = :skip if @on_duplicate == :update && updatable_columns.empty?
ensure_valid_options_for_connection!
end
def execute
message = +"#{model} "
message << "Bulk " if inserts.many?
message << (on_duplicate == :update ? "Upsert" : "Insert")
connection.exec_insert_all to_sql, message
end
def updatable_columns
keys - readonly_columns - unique_by_columns
end
def primary_keys
Array(model.primary_key)
end
def skip_duplicates?
on_duplicate == :skip
end
def update_duplicates?
on_duplicate == :update
end
def map_key_with_value
inserts.map do |attributes|
attributes = attributes.stringify_keys
verify_attributes(attributes)
keys.map do |key|
yield key, attributes[key]
end
end
end
private
def find_unique_index_for(unique_by)
match = Array(unique_by).map(&:to_s)
if index = unique_indexes.find { |i| match.include?(i.name) || i.columns == match }
index
else
raise ArgumentError, "No unique index found for #{unique_by}"
end
end
def unique_indexes
connection.schema_cache.indexes(model.table_name).select(&:unique)
end
def ensure_valid_options_for_connection!
if returning && !connection.supports_insert_returning?
raise ArgumentError, "#{connection.class} does not support :returning"
end
if skip_duplicates? && !connection.supports_insert_on_duplicate_skip?
raise ArgumentError, "#{connection.class} does not support skipping duplicates"
end
if update_duplicates? && !connection.supports_insert_on_duplicate_update?
raise ArgumentError, "#{connection.class} does not support upsert"
end
if unique_by && !connection.supports_insert_conflict_target?
raise ArgumentError, "#{connection.class} does not support :unique_by"
end
end
def to_sql
connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self))
end
def readonly_columns
primary_keys + model.readonly_attributes.to_a
end
def unique_by_columns
Array(unique_by&.columns)
end
def verify_attributes(attributes)
if keys != attributes.keys.to_set
raise ArgumentError, "All objects being inserted must have the same keys"
end
end
class Builder # :nodoc:
attr_reader :model
delegate :skip_duplicates?, :update_duplicates?, :keys, to: :insert_all
def initialize(insert_all)
@insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection
end
def into
"INTO #{model.quoted_table_name} (#{columns_list})"
end
def values_list
types = extract_types_from_columns_on(model.table_name, keys: keys)
values_list = insert_all.map_key_with_value do |key, value|
connection.with_yaml_fallback(types[key].serialize(value))
end
connection.visitor.compile(Arel::Nodes::ValuesList.new(values_list))
end
def returning
format_columns(insert_all.returning) if insert_all.returning
end
def conflict_target
if index = insert_all.unique_by
sql = +"(#{format_columns(index.columns)})"
sql << " WHERE #{index.where}" if index.where
sql
elsif update_duplicates?
"(#{format_columns(insert_all.primary_keys)})"
end
end
def updatable_columns
quote_columns(insert_all.updatable_columns)
end
private
attr_reader :connection, :insert_all
def columns_list
format_columns(insert_all.keys)
end
def extract_types_from_columns_on(table_name, keys:)
columns = connection.schema_cache.columns_hash(table_name)
unknown_column = (keys - columns.keys).first
raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column
keys.index_with { |key| model.type_for_attribute(key) }
end
def format_columns(columns)
quote_columns(columns).join(",")
end
def quote_columns(columns)
columns.map(&connection.method(:quote_column_name))
end
end
end
end
|