1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
# frozen_string_literal: true
module ActiveRecord::Import::PostgreSQLAdapter
include ActiveRecord::Import::ImportSupport
include ActiveRecord::Import::OnDuplicateKeyUpdateSupport
MIN_VERSION_FOR_UPSERT = 90_500
def insert_many( sql, values, options = {}, *args ) # :nodoc:
number_of_inserts = 1
returned_values = {}
ids = []
results = []
base_sql, post_sql = if sql.is_a?( String )
[sql, '']
elsif sql.is_a?( Array )
[sql.shift, sql.join( ' ' )]
end
sql2insert = base_sql + values.join( ',' ) + post_sql
selections = returning_selections(options)
if selections.blank? || (options[:no_returning] && !options[:recursive])
insert( sql2insert, *args )
else
returned_values = if selections.size > 1
# Select composite columns
db_result = select_all( sql2insert, *args )
{ values: db_result.rows, columns: db_result.columns }
else
{ values: select_values( sql2insert, *args ) }
end
clear_query_cache if query_cache_enabled
end
if options[:returning].blank?
ids = Array(returned_values[:values])
elsif options[:primary_key].blank?
options[:returning_columns] ||= returned_values[:columns]
results = Array(returned_values[:values])
else
# split primary key and returning columns
ids, results, options[:returning_columns] = split_ids_and_results(returned_values, options)
end
ActiveRecord::Import::Result.new([], number_of_inserts, ids, results)
end
def split_ids_and_results( selections, options )
ids = []
returning_values = []
columns = Array(selections[:columns])
values = Array(selections[:values])
id_indexes = Array(options[:primary_key]).map { |key| columns.index(key) }
returning_columns = columns.reject.with_index { |_, index| id_indexes.include?(index) }
returning_indexes = returning_columns.map { |column| columns.index(column) }
values.each do |value|
value_array = Array(value)
ids << id_indexes.map { |index| value_array[index] }
returning_values << returning_indexes.map { |index| value_array[index] }
end
ids.map!(&:first) if id_indexes.size == 1
returning_values.map!(&:first) if returning_columns.size == 1
[ids, returning_values, returning_columns]
end
def next_value_for_sequence(sequence_name)
%{nextval('#{sequence_name}')}
end
def post_sql_statements( table_name, options ) # :nodoc:
sql = []
if supports_on_duplicate_key_update?
# Options :recursive and :on_duplicate_key_ignore are mutually exclusive
if (options[:ignore] || options[:on_duplicate_key_ignore]) && !options[:on_duplicate_key_update] && !options[:recursive]
sql << sql_for_on_duplicate_key_ignore( table_name, options[:on_duplicate_key_ignore] )
end
elsif logger && options[:on_duplicate_key_ignore] && !options[:on_duplicate_key_update]
logger.warn "Ignoring on_duplicate_key_ignore because it is not supported by the database."
end
sql += super(table_name, options)
selections = returning_selections(options)
unless selections.blank? || (options[:no_returning] && !options[:recursive])
sql << " RETURNING #{selections.join(', ')}"
end
sql
end
def returning_selections(options)
selections = []
column_names = Array(options[:model].column_names)
selections += Array(options[:primary_key]) if options[:primary_key].present?
selections += Array(options[:returning]) if options[:returning].present?
selections.map do |selection|
column_names.include?(selection.to_s) ? "\"#{selection}\"" : selection
end
end
# Add a column to be updated on duplicate key update
def add_column_for_on_duplicate_key_update( column, options = {} ) # :nodoc:
arg = options[:on_duplicate_key_update]
if arg.is_a?( Hash )
columns = arg.fetch( :columns ) { arg[:columns] = [] }
case columns
when Array then columns << column.to_sym unless columns.include?( column.to_sym )
when Hash then columns[column.to_sym] = column.to_sym
end
elsif arg.is_a?( Array )
arg << column.to_sym unless arg.include?( column.to_sym )
end
end
# Returns a generated ON CONFLICT DO NOTHING statement given the passed
# in +args+.
def sql_for_on_duplicate_key_ignore( table_name, *args ) # :nodoc:
arg = args.first
conflict_target = sql_for_conflict_target( arg ) if arg.is_a?( Hash )
" ON CONFLICT #{conflict_target}DO NOTHING"
end
# Returns a generated ON CONFLICT DO UPDATE statement given the passed
# in +args+.
def sql_for_on_duplicate_key_update( table_name, *args ) # :nodoc:
arg, primary_key, locking_column = args
arg = { columns: arg } if arg.is_a?( Array ) || arg.is_a?( String )
return unless arg.is_a?( Hash )
sql = ' ON CONFLICT '.dup
conflict_target = sql_for_conflict_target( arg )
columns = arg.fetch( :columns, [] )
condition = arg[:condition]
if columns.respond_to?( :empty? ) && columns.empty?
return sql << "#{conflict_target}DO NOTHING"
end
conflict_target ||= sql_for_default_conflict_target( table_name, primary_key )
unless conflict_target
raise ArgumentError, 'Expected :conflict_target or :constraint_name to be specified'
end
sql << "#{conflict_target}DO UPDATE SET "
if columns.is_a?( Array )
sql << sql_for_on_duplicate_key_update_as_array( table_name, locking_column, columns )
elsif columns.is_a?( Hash )
sql << sql_for_on_duplicate_key_update_as_hash( table_name, locking_column, columns )
elsif columns.is_a?( String )
sql << columns
else
raise ArgumentError, 'Expected :columns to be an Array or Hash'
end
sql << " WHERE #{condition}" if condition.present?
sql
end
def sql_for_on_duplicate_key_update_as_array( table_name, locking_column, arr ) # :nodoc:
results = arr.map do |column|
qc = quote_column_name( column )
"#{qc}=EXCLUDED.#{qc}"
end
increment_locking_column!(table_name, results, locking_column)
results.join( ',' )
end
def sql_for_on_duplicate_key_update_as_hash( table_name, locking_column, hsh ) # :nodoc:
results = hsh.map do |column1, column2|
qc1 = quote_column_name( column1 )
qc2 = quote_column_name( column2 )
"#{qc1}=EXCLUDED.#{qc2}"
end
increment_locking_column!(table_name, results, locking_column)
results.join( ',' )
end
def sql_for_conflict_target( args = {} )
constraint_name = args[:constraint_name]
conflict_target = args[:conflict_target]
index_predicate = args[:index_predicate]
if constraint_name.present?
"ON CONSTRAINT #{constraint_name} "
elsif conflict_target.present?
sql = '(' + Array( conflict_target ).reject( &:blank? ).join( ', ' ) + ') '
sql += "WHERE #{index_predicate} " if index_predicate
sql
end
end
def sql_for_default_conflict_target( table_name, primary_key )
conflict_target = Array(primary_key).join(', ')
"(#{conflict_target}) " if conflict_target.present?
end
# Return true if the statement is a duplicate key record error
def duplicate_key_update_error?(exception) # :nodoc:
exception.is_a?(ActiveRecord::StatementInvalid) && exception.to_s.include?('duplicate key')
end
def supports_on_duplicate_key_update?
database_version >= MIN_VERSION_FOR_UPSERT
end
def supports_setting_primary_key_of_imported_objects?
true
end
def database_version
defined?(postgresql_version) ? postgresql_version : super
end
end
|