File: insert_all.rb

package info (click to toggle)
rails 2%3A7.2.2.1%2Bdfsg-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 43,352 kB
  • sloc: ruby: 349,799; javascript: 30,703; yacc: 46; sql: 43; sh: 29; makefile: 27
file content (328 lines) | stat: -rw-r--r-- 10,687 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# frozen_string_literal: true

require "active_support/core_ext/enumerable"

module ActiveRecord
  class InsertAll # :nodoc:
    attr_reader :model, :connection, :inserts, :keys
    attr_reader :on_duplicate, :update_only, :returning, :unique_by, :update_sql

    class << self
      def execute(relation, ...)
        relation.model.with_connection do |c|
          new(relation, c, ...).execute
        end
      end
    end

    def initialize(relation, connection, inserts, on_duplicate:, update_only: nil, returning: nil, unique_by: nil, record_timestamps: nil)
      @relation = relation
      @model, @connection, @inserts = relation.model, connection, inserts.map(&:stringify_keys)
      @on_duplicate, @update_only, @returning, @unique_by = on_duplicate, update_only, returning, unique_by
      @record_timestamps = record_timestamps.nil? ? model.record_timestamps : record_timestamps

      disallow_raw_sql!(on_duplicate)
      disallow_raw_sql!(returning)

      if @inserts.empty?
        @keys = []
      else
        resolve_sti
        resolve_attribute_aliases
        @keys = @inserts.first.keys
      end

      @scope_attributes = relation.scope_for_create.except(@model.inheritance_column)
      @keys |= @scope_attributes.keys
      @keys = @keys.to_set

      @returning = (connection.supports_insert_returning? ? primary_keys : false) if @returning.nil?
      @returning = false if @returning == []

      @unique_by = find_unique_index_for(@unique_by)

      configure_on_duplicate_update_logic
      ensure_valid_options_for_connection!
    end

    def execute
      return ActiveRecord::Result.empty if inserts.empty?

      message = +"#{model} "
      message << "Bulk " if inserts.many?
      message << (on_duplicate == :update ? "Upsert" : "Insert")
      connection.exec_insert_all to_sql, message
    end

    def updatable_columns
      @updatable_columns ||= keys - readonly_columns - unique_by_columns
    end

    def primary_keys
      Array(@model.schema_cache.primary_keys(model.table_name))
    end

    def skip_duplicates?
      on_duplicate == :skip
    end

    def update_duplicates?
      on_duplicate == :update
    end

    def map_key_with_value
      inserts.map do |attributes|
        attributes = attributes.stringify_keys
        attributes.merge!(@scope_attributes)
        attributes.reverse_merge!(timestamps_for_create) if record_timestamps?

        verify_attributes(attributes)

        keys_including_timestamps.map do |key|
          yield key, attributes[key]
        end
      end
    end

    def record_timestamps?
      @record_timestamps
    end

    # TODO: Consider renaming this method, as it only conditionally extends keys, not always
    def keys_including_timestamps
      @keys_including_timestamps ||= if record_timestamps?
        keys + model.all_timestamp_attributes_in_model
      else
        keys
      end
    end

    private
      def has_attribute_aliases?(attributes)
        attributes.keys.any? { |attribute| model.attribute_alias?(attribute) }
      end

      def resolve_sti
        return if model.descends_from_active_record?

        sti_type = model.sti_name
        @inserts = @inserts.map do |insert|
          insert.reverse_merge(model.inheritance_column.to_s => sti_type)
        end
      end

      def resolve_attribute_aliases
        return unless has_attribute_aliases?(@inserts.first)

        @inserts = @inserts.map do |insert|
          insert.transform_keys { |attribute| resolve_attribute_alias(attribute) }
        end

        @update_only = Array(@update_only).map { |attribute| resolve_attribute_alias(attribute) } if @update_only
        @unique_by = Array(@unique_by).map { |attribute| resolve_attribute_alias(attribute) } if @unique_by
      end

      def resolve_attribute_alias(attribute)
        model.attribute_alias(attribute) || attribute
      end

      def configure_on_duplicate_update_logic
        if custom_update_sql_provided? && update_only.present?
          raise ArgumentError, "You can't set :update_only and provide custom update SQL via :on_duplicate at the same time"
        end

        if update_only.present?
          @updatable_columns = Array(update_only)
          @on_duplicate = :update
        elsif custom_update_sql_provided?
          @update_sql = on_duplicate
          @on_duplicate = :update
        elsif @on_duplicate == :update && updatable_columns.empty?
          @on_duplicate = :skip
        end
      end

      def custom_update_sql_provided?
        @custom_update_sql_provided ||= Arel.arel_node?(on_duplicate)
      end

      def find_unique_index_for(unique_by)
        if !connection.supports_insert_conflict_target?
          return if unique_by.nil?

          raise ArgumentError, "#{connection.class} does not support :unique_by"
        end

        name_or_columns = unique_by || model.primary_key
        match = Array(name_or_columns).map(&:to_s)
        sorted_match = match.sort

        if index = unique_indexes.find { |i| match.include?(i.name) || Array(i.columns).sort == sorted_match }
          index
        elsif match == primary_keys
          unique_by.nil? ? nil : ActiveRecord::ConnectionAdapters::IndexDefinition.new(model.table_name, "#{model.table_name}_primary_key", true, match)
        else
          raise ArgumentError, "No unique index found for #{name_or_columns}"
        end
      end

      def unique_indexes
        @model.schema_cache.indexes(model.table_name).select(&:unique)
      end

      def ensure_valid_options_for_connection!
        if returning && !connection.supports_insert_returning?
          raise ArgumentError, "#{connection.class} does not support :returning"
        end

        if skip_duplicates? && !connection.supports_insert_on_duplicate_skip?
          raise ArgumentError, "#{connection.class} does not support skipping duplicates"
        end

        if update_duplicates? && !connection.supports_insert_on_duplicate_update?
          raise ArgumentError, "#{connection.class} does not support upsert"
        end

        if unique_by && !connection.supports_insert_conflict_target?
          raise ArgumentError, "#{connection.class} does not support :unique_by"
        end
      end


      def to_sql
        connection.build_insert_sql(ActiveRecord::InsertAll::Builder.new(self))
      end


      def readonly_columns
        primary_keys + model.readonly_attributes
      end

      def unique_by_columns
        Array(unique_by&.columns)
      end


      def verify_attributes(attributes)
        if keys_including_timestamps != attributes.keys.to_set
          raise ArgumentError, "All objects being inserted must have the same keys"
        end
      end

      def disallow_raw_sql!(value)
        return if !value.is_a?(String) || Arel.arel_node?(value)

        raise ArgumentError, "Dangerous query method (method whose arguments are used as raw " \
                             "SQL) called: #{value}. " \
                             "Known-safe values can be passed " \
                             "by wrapping them in Arel.sql()."
      end

      def timestamps_for_create
        model.all_timestamp_attributes_in_model.index_with(connection.high_precision_current_timestamp)
      end

      class Builder # :nodoc:
        attr_reader :model

        delegate :skip_duplicates?, :update_duplicates?, :keys, :keys_including_timestamps, :record_timestamps?, to: :insert_all

        def initialize(insert_all)
          @insert_all, @model, @connection = insert_all, insert_all.model, insert_all.connection
        end

        def into
          "INTO #{model.quoted_table_name} (#{columns_list})"
        end

        def values_list
          types = extract_types_from_columns_on(model.table_name, keys: keys_including_timestamps)

          values_list = insert_all.map_key_with_value do |key, value|
            next value if Arel::Nodes::SqlLiteral === value
            connection.with_yaml_fallback(types[key].serialize(value))
          end

          connection.visitor.compile(Arel::Nodes::ValuesList.new(values_list))
        end

        def returning
          return unless insert_all.returning

          if insert_all.returning.is_a?(String)
            insert_all.returning
          else
            Array(insert_all.returning).map do |attribute|
              if model.attribute_alias?(attribute)
                "#{quote_column(model.attribute_alias(attribute))} AS #{quote_column(attribute)}"
              else
                quote_column(attribute)
              end
            end.join(",")
          end
        end

        def conflict_target
          if index = insert_all.unique_by
            sql = +"(#{format_columns(index.columns)})"
            sql << " WHERE #{index.where}" if index.where
            sql
          elsif update_duplicates?
            "(#{format_columns(insert_all.primary_keys)})"
          end
        end

        def updatable_columns
          quote_columns(insert_all.updatable_columns)
        end

        def touch_model_timestamps_unless(&block)
          return "" unless update_duplicates? && record_timestamps?

          model.timestamp_attributes_for_update_in_model.filter_map do |column_name|
            if touch_timestamp_attribute?(column_name)
              "#{column_name}=(CASE WHEN (#{updatable_columns.map(&block).join(" AND ")}) THEN #{model.quoted_table_name}.#{column_name} ELSE #{connection.high_precision_current_timestamp} END),"
            end
          end.join
        end

        def raw_update_sql
          insert_all.update_sql
        end

        alias raw_update_sql? raw_update_sql

        private
          attr_reader :connection, :insert_all

          def touch_timestamp_attribute?(column_name)
            insert_all.updatable_columns.exclude?(column_name)
          end

          def columns_list
            format_columns(insert_all.keys_including_timestamps)
          end

          def extract_types_from_columns_on(table_name, keys:)
            columns = @model.schema_cache.columns_hash(table_name)

            unknown_column = (keys - columns.keys).first
            raise UnknownAttributeError.new(model.new, unknown_column) if unknown_column

            keys.index_with { |key| model.type_for_attribute(key) }
          end

          def format_columns(columns)
            columns.respond_to?(:map) ? quote_columns(columns).join(",") : columns
          end

          def quote_columns(columns)
            columns.map { |column| quote_column(column) }
          end

          def quote_column(column)
            connection.quote_column_name(column)
          end
      end
  end
end