File: import.rb

package info (click to toggle)
ruby-activerecord-import 1.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 840 kB
  • sloc: ruby: 4,698; makefile: 7
file content (1123 lines) | stat: -rw-r--r-- 46,466 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
# frozen_string_literal: true

require "ostruct"

module ActiveRecord::Import::ConnectionAdapters; end

module ActiveRecord::Import #:nodoc:
  Result = Struct.new(:failed_instances, :num_inserts, :ids, :results)

  module ImportSupport #:nodoc:
    def supports_import? #:nodoc:
      true
    end
  end

  module OnDuplicateKeyUpdateSupport #:nodoc:
    def supports_on_duplicate_key_update? #:nodoc:
      true
    end
  end

  class MissingColumnError < StandardError
    def initialize(name, index)
      super "Missing column for value <#{name}> at index #{index}"
    end
  end

  class Validator
    def initialize(klass, options = {})
      @options = options
      @validator_class = klass
      init_validations(klass)
    end

    def init_validations(klass)
      @validate_callbacks = klass._validate_callbacks.dup

      @validate_callbacks.each_with_index do |callback, i|
        filter = callback.respond_to?(:raw_filter) ? callback.raw_filter : callback.filter
        next unless filter.class.name =~ /Validations::PresenceValidator/ ||
                    (!@options[:validate_uniqueness] &&
                     filter.is_a?(ActiveRecord::Validations::UniquenessValidator))

        callback = callback.dup
        filter = filter.dup
        attrs = filter.instance_variable_get(:@attributes).dup

        if filter.is_a?(ActiveRecord::Validations::UniquenessValidator)
          attrs = []
        else
          associations = klass.reflect_on_all_associations(:belongs_to)
          associations.each do |assoc|
            if (index = attrs.index(assoc.name))
              key = assoc.foreign_key.is_a?(Array) ? assoc.foreign_key.map(&:to_sym) : assoc.foreign_key.to_sym
              attrs[index] = key unless attrs.include?(key)
            end
          end
        end

        filter.instance_variable_set(:@attributes, attrs.flatten)

        if @validate_callbacks.respond_to?(:chain, true)
          @validate_callbacks.send(:chain).tap do |chain|
            callback.instance_variable_set(:@filter, filter)
            chain[i] = callback
          end
        else
          callback.raw_filter = filter
          callback.filter = callback.send(:_compile_filter, filter)
          @validate_callbacks[i] = callback
        end
      end
    end

    def valid_model?(model)
      init_validations(model.class) unless model.class == @validator_class

      validation_context = @options[:validate_with_context]
      validation_context ||= (model.new_record? ? :create : :update)
      current_context = model.send(:validation_context)

      begin
        model.send(:validation_context=, validation_context)
        model.errors.clear

        model.run_callbacks(:validation) do
          if defined?(ActiveSupport::Callbacks::Filters::Environment) # ActiveRecord >= 4.1
            runner = @validate_callbacks.compile
            env = ActiveSupport::Callbacks::Filters::Environment.new(model, false, nil)
            if runner.respond_to?(:call) # ActiveRecord < 5.1
              runner.call(env)
            else # ActiveRecord 5.1
              # Note that this is a gross simplification of ActiveSupport::Callbacks#run_callbacks.
              # It's technically possible for there to exist an "around" callback in the
              # :validate chain, but this would be an aberration, since Rails doesn't define
              # "around_validate". Still, rather than silently ignoring such callbacks, we
              # explicitly raise a RuntimeError, since activerecord-import was asked to perform
              # validations and it's unable to do so.
              #
              # The alternative here would be to copy-and-paste the bulk of the
              # ActiveSupport::Callbacks#run_callbacks method, which is undesirable if there's
              # no real-world use case for it.
              raise "The :validate callback chain contains an 'around' callback, which is unsupported" unless runner.final?
              runner.invoke_before(env)
              runner.invoke_after(env)
            end
          elsif @validate_callbacks.method(:compile).arity == 0 # ActiveRecord = 4.0
            model.instance_eval @validate_callbacks.compile
          else # ActiveRecord 3.x
            model.instance_eval @validate_callbacks.compile(nil, model)
          end
        end

        model.errors.empty?
      ensure
        model.send(:validation_context=, current_context)
      end
    end
  end
end

class ActiveRecord::Associations::CollectionProxy
  def bulk_import(*args, &block)
    @association.bulk_import(*args, &block)
  end
  alias import bulk_import unless respond_to? :import
end

class ActiveRecord::Associations::CollectionAssociation
  def bulk_import(*args, &block)
    unless owner.persisted?
      raise ActiveRecord::RecordNotSaved, "You cannot call import unless the parent is saved"
    end

    options = args.last.is_a?(Hash) ? args.pop : {}

    model_klass = reflection.klass
    symbolized_foreign_key = reflection.foreign_key.to_sym

    symbolized_column_names = if model_klass.connection.respond_to?(:supports_virtual_columns?) && model_klass.connection.supports_virtual_columns?
      model_klass.columns.reject(&:virtual?).map { |c| c.name.to_sym }
    else
      model_klass.column_names.map(&:to_sym)
    end

    owner_primary_key = reflection.active_record_primary_key.to_sym
    owner_primary_key_value = owner.send(owner_primary_key)

    # assume array of model objects
    if args.last.is_a?( Array ) && args.last.first.is_a?(ActiveRecord::Base)
      if args.length == 2
        models = args.last
        column_names = args.first.dup
      else
        models = args.first
        column_names = symbolized_column_names
      end

      unless symbolized_column_names.include?(symbolized_foreign_key)
        column_names << symbolized_foreign_key
      end

      models.each do |m|
        m.public_send "#{symbolized_foreign_key}=", owner_primary_key_value
        m.public_send "#{reflection.type}=", owner.class.name if reflection.type
      end

      return model_klass.bulk_import column_names, models, options

    # supports array of hash objects
    elsif args.last.is_a?( Array ) && args.last.first.is_a?(Hash)
      if args.length == 2
        array_of_hashes = args.last
        column_names = args.first.dup
        allow_extra_hash_keys = true
      else
        array_of_hashes = args.first
        column_names = array_of_hashes.first.keys
        allow_extra_hash_keys = false
      end

      symbolized_column_names = column_names.map(&:to_sym)
      unless symbolized_column_names.include?(symbolized_foreign_key)
        column_names << symbolized_foreign_key
      end

      if reflection.type && !symbolized_column_names.include?(reflection.type.to_sym)
        column_names << reflection.type.to_sym
      end

      array_of_attributes = array_of_hashes.map do |h|
        error_message = model_klass.send(:validate_hash_import, h, symbolized_column_names, allow_extra_hash_keys)

        raise ArgumentError, error_message if error_message

        column_names.map do |key|
          if key == symbolized_foreign_key
            owner_primary_key_value
          elsif reflection.type && key == reflection.type.to_sym
            owner.class.name
          else
            h[key]
          end
        end
      end

      return model_klass.bulk_import column_names, array_of_attributes, options

    # supports empty array
    elsif args.last.is_a?( Array ) && args.last.empty?
      return ActiveRecord::Import::Result.new([], 0, [])

    # supports 2-element array and array
    elsif args.size == 2 && args.first.is_a?( Array ) && args.last.is_a?( Array )
      column_names, array_of_attributes = args

      # dup the passed args so we don't modify unintentionally
      column_names = column_names.dup
      array_of_attributes = array_of_attributes.map(&:dup)

      symbolized_column_names = column_names.map(&:to_sym)

      if symbolized_column_names.include?(symbolized_foreign_key)
        index = symbolized_column_names.index(symbolized_foreign_key)
        array_of_attributes.each { |attrs| attrs[index] = owner_primary_key_value }
      else
        column_names << symbolized_foreign_key
        array_of_attributes.each { |attrs| attrs << owner_primary_key_value }
      end

      if reflection.type
        symbolized_type = reflection.type.to_sym
        if symbolized_column_names.include?(symbolized_type)
          index = symbolized_column_names.index(symbolized_type)
          array_of_attributes.each { |attrs| attrs[index] = owner.class.name }
        else
          column_names << symbolized_type
          array_of_attributes.each { |attrs| attrs << owner.class.name }
        end
      end

      return model_klass.bulk_import column_names, array_of_attributes, options
    else
      raise ArgumentError, "Invalid arguments!"
    end
  end
  alias import bulk_import unless respond_to? :import
end

module ActiveRecord::Import::Connection
  def establish_connection(args = nil)
    conn = super(args)
    ActiveRecord::Import.load_from_connection_pool connection_pool
    conn
  end
end

class ActiveRecord::Base
  class << self
    prepend ActiveRecord::Import::Connection

    # Returns true if the current database connection adapter
    # supports import functionality, otherwise returns false.
    def supports_import?(*args)
      connection.respond_to?(:supports_import?) && connection.supports_import?(*args)
    end

    # Returns true if the current database connection adapter
    # supports on duplicate key update functionality, otherwise
    # returns false.
    def supports_on_duplicate_key_update?
      connection.respond_to?(:supports_on_duplicate_key_update?) && connection.supports_on_duplicate_key_update?
    end

    # returns true if the current database connection adapter
    # supports setting the primary key of bulk imported models, otherwise
    # returns false
    def supports_setting_primary_key_of_imported_objects?
      connection.respond_to?(:supports_setting_primary_key_of_imported_objects?) && connection.supports_setting_primary_key_of_imported_objects?
    end

    # Imports a collection of values to the database.
    #
    # This is more efficient than using ActiveRecord::Base#create or
    # ActiveRecord::Base#save multiple times. This method works well if
    # you want to create more than one record at a time and do not care
    # about having ActiveRecord objects returned for each record
    # inserted.
    #
    # This can be used with or without validations. It does not utilize
    # the ActiveRecord::Callbacks during creation/modification while
    # performing the import.
    #
    # == Usage
    #  Model.import array_of_models
    #  Model.import column_names, array_of_models
    #  Model.import array_of_hash_objects
    #  Model.import column_names, array_of_hash_objects
    #  Model.import column_names, array_of_values
    #  Model.import column_names, array_of_values, options
    #
    # ==== Model.import array_of_models
    #
    # With this form you can call _import_ passing in an array of model
    # objects that you want updated.
    #
    # ==== Model.import column_names, array_of_values
    #
    # The first parameter +column_names+ is an array of symbols or
    # strings which specify the columns that you want to update.
    #
    # The second parameter, +array_of_values+, is an array of
    # arrays. Each subarray is a single set of values for a new
    # record. The order of values in each subarray should match up to
    # the order of the +column_names+.
    #
    # ==== Model.import column_names, array_of_values, options
    #
    # The first two parameters are the same as the above form. The third
    # parameter, +options+, is a hash. This is optional. Please see
    # below for what +options+ are available.
    #
    # == Options
    # * +validate+ - true|false, tells import whether or not to use
    #   ActiveRecord validations. Validations are enforced by default.
    #   It skips the uniqueness validation for performance reasons.
    #   You can find more details here:
    #   https://github.com/zdennis/activerecord-import/issues/228
    # * +ignore+ - true|false, an alias for on_duplicate_key_ignore.
    # * +on_duplicate_key_ignore+ - true|false, tells import to discard
    #   records that contain duplicate keys. For Postgres 9.5+ it adds
    #   ON CONFLICT DO NOTHING, for MySQL it uses INSERT IGNORE, and for
    #   SQLite it uses INSERT OR IGNORE. Cannot be enabled on a
    #   recursive import. For database adapters that normally support
    #   setting primary keys on imported objects, this option prevents
    #   that from occurring.
    # * +on_duplicate_key_update+ - :all, an Array, or Hash, tells import to
    #   use MySQL's ON DUPLICATE KEY UPDATE or Postgres/SQLite ON CONFLICT
    #   DO UPDATE ability. See On Duplicate Key Update below.
    # * +synchronize+ - an array of ActiveRecord instances for the model
    #   that you are currently importing data into. This synchronizes
    #   existing model instances in memory with updates from the import.
    # * +timestamps+ - true|false, tells import to not add timestamps
    #   (if false) even if record timestamps is disabled in ActiveRecord::Base
    # * +recursive+ - true|false, tells import to import all has_many/has_one
    #   associations if the adapter supports setting the primary keys of the
    #   newly imported objects. PostgreSQL only.
    # * +batch_size+ - an integer value to specify the max number of records to
    #   include per insert. Defaults to the total number of records to import.
    #
    # == Examples
    #  class BlogPost < ActiveRecord::Base ; end
    #
    #  # Example using array of model objects
    #  posts = [ BlogPost.new author_name: 'Zach Dennis', title: 'AREXT',
    #            BlogPost.new author_name: 'Zach Dennis', title: 'AREXT2',
    #            BlogPost.new author_name: 'Zach Dennis', title: 'AREXT3' ]
    #  BlogPost.import posts
    #
    #  # Example using array_of_hash_objects
    #  # NOTE: column_names will be determined by using the keys of the first hash in the array. If later hashes in the
    #  # array have different keys an exception will be raised. If you have hashes to import with different sets of keys
    #  # we recommend grouping these into batches before importing.
    #  values = [ {author_name: 'zdennis', title: 'test post'} ], [ {author_name: 'jdoe', title: 'another test post'} ] ]
    #  BlogPost.import values
    #
    #  # Example using column_names and array_of_hash_objects
    #  columns = [ :author_name, :title ]
    #  values = [ {author_name: 'zdennis', title: 'test post'} ], [ {author_name: 'jdoe', title: 'another test post'} ] ]
    #  BlogPost.import columns, values
    #
    #  # Example using column_names and array_of_values
    #  columns = [ :author_name, :title ]
    #  values = [ [ 'zdennis', 'test post' ], [ 'jdoe', 'another test post' ] ]
    #  BlogPost.import columns, values
    #
    #  # Example using column_names, array_of_value and options
    #  columns = [ :author_name, :title ]
    #  values = [ [ 'zdennis', 'test post' ], [ 'jdoe', 'another test post' ] ]
    #  BlogPost.import( columns, values, validate: false  )
    #
    #  # Example synchronizing existing instances in memory
    #  post = BlogPost.where(author_name: 'zdennis').first
    #  puts post.author_name # => 'zdennis'
    #  columns = [ :author_name, :title ]
    #  values = [ [ 'yoda', 'test post' ] ]
    #  BlogPost.import posts, synchronize: [ post ]
    #  puts post.author_name # => 'yoda'
    #
    #  # Example synchronizing unsaved/new instances in memory by using a uniqued imported field
    #  posts = [BlogPost.new(title: "Foo"), BlogPost.new(title: "Bar")]
    #  BlogPost.import posts, synchronize: posts, synchronize_keys: [:title]
    #  puts posts.first.persisted? # => true
    #
    # == On Duplicate Key Update (MySQL)
    #
    # The :on_duplicate_key_update option can be either :all, an Array, or a Hash.
    #
    # ==== Using :all
    #
    # The :on_duplicate_key_update option can be set to :all. All columns
    # other than the primary key are updated. If a list of column names is
    # supplied, only those columns will be updated. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: :all
    #
    # ==== Using an Array
    #
    # The :on_duplicate_key_update option can be an array of column
    # names. The column names are the only fields that are updated if
    # a duplicate record is found. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: [ :date_modified, :content, :author ]
    #
    # ====  Using A Hash
    #
    # The :on_duplicate_key_update option can be a hash of column names
    # to model attribute name mappings. This gives you finer grained
    # control over what fields are updated with what attributes on your
    # model. Below is an example:
    #
    #   BlogPost.import columns, attributes, on_duplicate_key_update: { title: :title }
    #
    # == On Duplicate Key Update (Postgres 9.5+ and SQLite 3.24+)
    #
    # The :on_duplicate_key_update option can be :all, an Array, or a Hash with up to
    # three attributes, :conflict_target (and optionally :index_predicate) or
    # :constraint_name (Postgres), and :columns.
    #
    # ==== Using :all
    #
    # The :on_duplicate_key_update option can be set to :all. All columns
    # other than the primary key are updated. If a list of column names is
    # supplied, only those columns will be updated. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: :all
    #
    # ==== Using an Array
    #
    # The :on_duplicate_key_update option can be an array of column
    # names. This option only handles inserts that conflict with the
    # primary key. If a table does not have a primary key, this will
    # not work. The column names are the only fields that are updated
    # if a duplicate record is found. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: [ :date_modified, :content, :author ]
    #
    # ====  Using a Hash
    #
    # The :on_duplicate_key_update option can be a hash with up to three
    # attributes, :conflict_target (and optionally :index_predicate) or
    # :constraint_name, and :columns. Unlike MySQL, Postgres requires the
    # conflicting constraint to be explicitly specified. Using this option
    # allows you to specify a constraint other than the primary key.
    #
    # ===== :conflict_target
    #
    # The :conflict_target attribute specifies the columns that make up the
    # conflicting unique constraint and can be a single column or an array of
    # column names. This attribute is ignored if :constraint_name is included,
    # but it is the preferred method of identifying a constraint. It will
    # default to the primary key. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { conflict_target: [ :author_id, :slug ], columns: [ :date_modified ] }
    #
    # ===== :index_predicate
    #
    # The :index_predicate attribute optionally specifies a WHERE condition
    # on :conflict_target, which is required for matching against partial
    # indexes. This attribute is ignored if :constraint_name is included.
    # Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { conflict_target: [ :author_id, :slug ], index_predicate: 'status <> 0', columns: [ :date_modified ] }
    #
    # ===== :constraint_name
    #
    # The :constraint_name attribute explicitly identifies the conflicting
    # unique index by name. Postgres documentation discourages using this method
    # of identifying an index unless absolutely necessary. Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { constraint_name: :blog_posts_pkey, columns: [ :date_modified ] }
    #
    # ===== :condition
    #
    # The :condition attribute optionally specifies a WHERE condition
    # on :conflict_action. Only rows for which this expression returns true will be updated.
    # Note that it's evaluated last, after a conflict has been identified as a candidate to update.
    # Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { conflict_target: [ :author_id ], condition: "blog_posts.title NOT LIKE '%sample%'", columns: [ :author_name ] }
    #
    # ===== :columns
    #
    # The :columns attribute can be either :all, an Array, or a Hash.
    #
    # ===== Using :all
    #
    # The :columns attribute can be :all. All columns other than the primary key will be updated.
    # If a list of column names is supplied, only those columns will be updated.
    # Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { conflict_target: :slug, columns: :all }
    #
    # ===== Using an Array
    #
    # The :columns attribute can be an array of column names. The column names
    # are the only fields that are updated if a duplicate record is found.
    # Below is an example:
    #
    #   BlogPost.import columns, values, on_duplicate_key_update: { conflict_target: :slug, columns: [ :date_modified, :content, :author ] }
    #
    # =====  Using a Hash
    #
    # The :columns option can be a hash of column names to model attribute name
    # mappings. This gives you finer grained control over what fields are updated
    # with what attributes on your model. Below is an example:
    #
    #   BlogPost.import columns, attributes, on_duplicate_key_update: { conflict_target: :slug, columns: { title: :title } }
    #
    # = Returns
    # This returns an object which responds to +failed_instances+ and +num_inserts+.
    # * failed_instances - an array of objects that fails validation and were not committed to the database. An empty array if no validation is performed.
    # * num_inserts - the number of insert statements it took to import the data
    # * ids - the primary keys of the imported ids if the adapter supports it, otherwise an empty array.
    # * results - import results if the adapter supports it, otherwise an empty array.
    def bulk_import(*args)
      if args.first.is_a?( Array ) && args.first.first.is_a?(ActiveRecord::Base)
        options = {}
        options.merge!( args.pop ) if args.last.is_a?(Hash)

        models = args.first
        import_helper(models, options)
      else
        import_helper(*args)
      end
    end
    alias import bulk_import unless ActiveRecord::Base.respond_to? :import

    # Imports a collection of values if all values are valid. Import fails at the
    # first encountered validation error and raises ActiveRecord::RecordInvalid
    # with the failed instance.
    def bulk_import!(*args)
      options = args.last.is_a?( Hash ) ? args.pop : {}
      options[:validate] = true
      options[:raise_error] = true

      bulk_import(*args, options)
    end
    alias import! bulk_import! unless ActiveRecord::Base.respond_to? :import!

    def import_helper( *args )
      options = { model: self, validate: true, timestamps: true, track_validation_failures: false }
      options.merge!( args.pop ) if args.last.is_a? Hash
      # making sure that current model's primary key is used
      options[:primary_key] = primary_key
      options[:locking_column] = locking_column if attribute_names.include?(locking_column)

      is_validating = options[:validate_with_context].present? ? true : options[:validate]
      validator = ActiveRecord::Import::Validator.new(self, options)

      # assume array of model objects
      if args.last.is_a?( Array ) && args.last.first.is_a?(ActiveRecord::Base)
        if args.length == 2
          models = args.last
          column_names = args.first.dup
        else
          models = args.first
          column_names = if connection.respond_to?(:supports_virtual_columns?) && connection.supports_virtual_columns?
            columns.reject(&:virtual?).map(&:name)
          else
            self.column_names.dup
          end
        end

        if models.first.id.nil?
          Array(primary_key).each do |c|
            if column_names.include?(c) && columns_hash[c].type == :uuid
              column_names.delete(c)
            end
          end
        end

        update_attrs = if record_timestamps && options[:timestamps]
          if respond_to?(:timestamp_attributes_for_update, true)
            send(:timestamp_attributes_for_update).map(&:to_sym)
          else
            allocate.send(:timestamp_attributes_for_update_in_model)
          end
        end

        array_of_attributes = []

        models.each do |model|
          if supports_setting_primary_key_of_imported_objects?
            load_association_ids(model)
          end

          if is_validating && !validator.valid_model?(model)
            raise(ActiveRecord::RecordInvalid, model) if options[:raise_error]
            next
          end

          array_of_attributes << column_names.map do |name|
            if model.persisted? &&
               update_attrs && update_attrs.include?(name.to_sym) &&
               !model.send("#{name}_changed?")
              nil
            else
              model.read_attribute(name.to_s)
            end
          end
        end
        # supports array of hash objects
      elsif args.last.is_a?( Array ) && args.last.first.is_a?(Hash)
        if args.length == 2
          array_of_hashes = args.last
          column_names = args.first.dup
          allow_extra_hash_keys = true
        else
          array_of_hashes = args.first
          column_names = array_of_hashes.first.keys
          allow_extra_hash_keys = false
        end

        array_of_attributes = array_of_hashes.map do |h|
          error_message = validate_hash_import(h, column_names, allow_extra_hash_keys)

          raise ArgumentError, error_message if error_message

          column_names.map do |key|
            h[key]
          end
        end
        # supports empty array
      elsif args.last.is_a?( Array ) && args.last.empty?
        return ActiveRecord::Import::Result.new([], 0, [], [])
        # supports 2-element array and array
      elsif args.size == 2 && args.first.is_a?( Array ) && args.last.is_a?( Array )

        unless args.last.first.is_a?(Array)
          raise ArgumentError, "Last argument should be a two dimensional array '[[]]'. First element in array was a #{args.last.first.class}"
        end

        column_names, array_of_attributes = args

        # dup the passed args so we don't modify unintentionally
        column_names = column_names.dup
        array_of_attributes = array_of_attributes.map(&:dup)
      else
        raise ArgumentError, "Invalid arguments!"
      end

      # Force the primary key col into the insert if it's not
      # on the list and we are using a sequence and stuff a nil
      # value for it into each row so the sequencer will fire later
      symbolized_column_names = Array(column_names).map(&:to_sym)
      symbolized_primary_key = Array(primary_key).map(&:to_sym)

      if !symbolized_primary_key.to_set.subset?(symbolized_column_names.to_set) && connection.prefetch_primary_key? && sequence_name
        column_count = column_names.size
        column_names.concat(Array(primary_key)).uniq!
        columns_added = column_names.size - column_count
        new_fields = Array.new(columns_added)
        array_of_attributes.each { |a| a.concat(new_fields) }
      end

      # Don't modify incoming arguments
      on_duplicate_key_update = options[:on_duplicate_key_update]
      if on_duplicate_key_update
        updatable_columns = symbolized_column_names.reject { |c| symbolized_primary_key.include? c }
        options[:on_duplicate_key_update] = if on_duplicate_key_update.is_a?(Hash)
          on_duplicate_key_update.each_with_object({}) do |(k, v), duped_options|
            duped_options[k] = if k == :columns && v == :all
              updatable_columns
            elsif v.duplicable?
              v.dup
            else
              v
            end
          end
        elsif on_duplicate_key_update == :all
          updatable_columns
        elsif on_duplicate_key_update.duplicable?
          on_duplicate_key_update.dup
        else
          on_duplicate_key_update
        end
      end

      timestamps = {}

      # record timestamps unless disabled in ActiveRecord::Base
      if record_timestamps && options[:timestamps]
        timestamps = add_special_rails_stamps column_names, array_of_attributes, options
      end

      return_obj = if is_validating
        import_with_validations( column_names, array_of_attributes, options ) do |failed_instances|
          if models
            models.each { |m| failed_instances << m if m.errors.any? }
          else
            # create instances for each of our column/value sets
            arr = validations_array_for_column_names_and_attributes( column_names, array_of_attributes )

            # keep track of the instance and the position it is currently at. if this fails
            # validation we'll use the index to remove it from the array_of_attributes
            arr.each_with_index do |hsh, i|
              # utilize block initializer syntax to prevent failure when 'mass_assignment_sanitizer = :strict'
              model = new do |m|
                hsh.each_pair { |k, v| m[k] = v }
              end

              next if validator.valid_model?(model)
              raise(ActiveRecord::RecordInvalid, model) if options[:raise_error]

              array_of_attributes[i] = nil
              failure = model.dup
              failure.errors.send(:initialize_dup, model.errors)
              failed_instances << (options[:track_validation_failures] ? [i, failure] : failure )
            end
            array_of_attributes.compact!
          end
        end
      else
        import_without_validations_or_callbacks( column_names, array_of_attributes, options )
      end

      if options[:synchronize]
        sync_keys = options[:synchronize_keys] || Array(primary_key)
        synchronize( options[:synchronize], sync_keys)
      end
      return_obj.num_inserts = 0 if return_obj.num_inserts.nil?

      # if we have ids, then set the id on the models and mark the models as clean.
      if models && supports_setting_primary_key_of_imported_objects?
        set_attributes_and_mark_clean(models, return_obj, timestamps, options)

        # if there are auto-save associations on the models we imported that are new, import them as well
        if options[:recursive]
          options[:on_duplicate_key_update] = on_duplicate_key_update unless on_duplicate_key_update.nil?
          import_associations(models, options.dup.merge(validate: false))
        end
      end

      return_obj
    end

    # Imports the passed in +column_names+ and +array_of_attributes+
    # given the passed in +options+ Hash with validations. Returns an
    # object with the methods +failed_instances+ and +num_inserts+.
    # +failed_instances+ is an array of instances that failed validations.
    # +num_inserts+ is the number of inserts it took to import the data. See
    # ActiveRecord::Base.import for more information on
    # +column_names+, +array_of_attributes+ and +options+.
    def import_with_validations( column_names, array_of_attributes, options = {} )
      failed_instances = []

      yield failed_instances if block_given?

      result = if options[:all_or_none] && failed_instances.any?
        ActiveRecord::Import::Result.new([], 0, [], [])
      else
        import_without_validations_or_callbacks( column_names, array_of_attributes, options )
      end
      ActiveRecord::Import::Result.new(failed_instances, result.num_inserts, result.ids, result.results)
    end

    # Imports the passed in +column_names+ and +array_of_attributes+
    # given the passed in +options+ Hash. This will return the number
    # of insert operations it took to create these records without
    # validations or callbacks. See ActiveRecord::Base.import for more
    # information on +column_names+, +array_of_attributes_ and
    # +options+.
    def import_without_validations_or_callbacks( column_names, array_of_attributes, options = {} )
      return ActiveRecord::Import::Result.new([], 0, [], []) if array_of_attributes.empty?

      column_names = column_names.map(&:to_sym)
      scope_columns, scope_values = scope_attributes.to_a.transpose

      unless scope_columns.blank?
        scope_columns.zip(scope_values).each do |name, value|
          name_as_sym = name.to_sym
          next if column_names.include?(name_as_sym) || name_as_sym == inheritance_column.to_sym
          column_names << name_as_sym
          array_of_attributes.each { |attrs| attrs << value }
        end
      end

      if finder_needs_type_condition?
        unless column_names.include?(inheritance_column.to_sym)
          column_names << inheritance_column.to_sym
          array_of_attributes.each { |attrs| attrs << sti_name }
        end
      end

      columns = column_names.each_with_index.map do |name, i|
        column = columns_hash[name.to_s]
        raise ActiveRecord::Import::MissingColumnError.new(name.to_s, i) if column.nil?
        column
      end

      columns_sql = "(#{column_names.map { |name| connection.quote_column_name(name) }.join(',')})"
      pre_sql_statements = connection.pre_sql_statements( options )
      insert_sql = ['INSERT', pre_sql_statements, "INTO #{quoted_table_name} #{columns_sql} VALUES "]
      insert_sql = insert_sql.flatten.join(' ')
      values_sql = values_sql_for_columns_and_attributes(columns, array_of_attributes)

      number_inserted = 0
      ids = []
      results = []
      if supports_import?
        # generate the sql
        post_sql_statements = connection.post_sql_statements( quoted_table_name, options )
        import_size = values_sql.size

        batch_size = options[:batch_size] || import_size
        run_proc = options[:batch_size].to_i.positive? && options[:batch_progress].respond_to?( :call )
        progress_proc = options[:batch_progress]
        current_batch = 0
        batches = (import_size / batch_size.to_f).ceil

        values_sql.each_slice(batch_size) do |batch_values|
          batch_started_at = Time.now.to_i

          # perform the inserts
          result = connection.insert_many( [insert_sql, post_sql_statements].flatten,
            batch_values,
            options,
            "#{model_name} Create Many" )

          number_inserted += result.num_inserts
          ids += result.ids
          results += result.results
          current_batch += 1

          progress_proc.call(import_size, batches, current_batch, Time.now.to_i - batch_started_at) if run_proc
        end
      else
        transaction(requires_new: true) do
          values_sql.each do |values|
            ids << connection.insert(insert_sql + values)
            number_inserted += 1
          end
        end
      end
      ActiveRecord::Import::Result.new([], number_inserted, ids, results)
    end

    private

    def set_attributes_and_mark_clean(models, import_result, timestamps, options)
      return if models.nil?
      models -= import_result.failed_instances

      # if ids were returned for all models we know all were updated
      if models.size == import_result.ids.size
        import_result.ids.each_with_index do |id, index|
          model = models[index]
          model.id = id

          timestamps.each do |attr, value|
            model.send(attr + "=", value) if model.send(attr).nil?
          end
        end
      end

      deserialize_value = lambda do |column, value|
        column = columns_hash[column]
        return value unless column
        if respond_to?(:type_caster)
          type = type_for_attribute(column.name)
          type.deserialize(value)
        elsif column.respond_to?(:type_cast_from_database)
          column.type_cast_from_database(value)
        else
          value
        end
      end

      set_value = lambda do |model, column, value|
        val = deserialize_value.call(column, value)
        if model.attribute_names.include?(column)
          model.send("#{column}=", val)
        else
          attributes = attributes_builder.build_from_database(model.attributes.merge(column => val))
          model.instance_variable_set(:@attributes, attributes)
        end
      end

      columns = Array(options[:returning_columns])
      results = Array(import_result.results)
      if models.size == results.size
        single_column = columns.first if columns.size == 1
        results.each_with_index do |result, index|
          model = models[index]

          if single_column
            set_value.call(model, single_column, result)
          else
            columns.each_with_index do |column, col_index|
              set_value.call(model, column, result[col_index])
            end
          end
        end
      end

      models.each do |model|
        if model.respond_to?(:changes_applied) # Rails 4.1.8 and higher
          model.changes_internally_applied if model.respond_to?(:changes_internally_applied) # legacy behavior for Rails 5.1
          model.changes_applied
        elsif model.respond_to?(:clear_changes_information) # Rails 4.0 and higher
          model.clear_changes_information
        else # Rails 3.2
          model.instance_variable_get(:@changed_attributes).clear
        end
        model.instance_variable_set(:@new_record, false)
      end
    end

    # Sync belongs_to association ids with foreign key field
    def load_association_ids(model)
      changed_columns = model.changed
      association_reflections = model.class.reflect_on_all_associations(:belongs_to)
      association_reflections.each do |association_reflection|
        next if association_reflection.options[:polymorphic]

        column_names = Array(association_reflection.foreign_key).map(&:to_s)
        column_names.each_with_index do |column_name, column_index|
          next if changed_columns.include?(column_name)

          association = model.association(association_reflection.name)
          association = association.target
          next if association.blank? || model.public_send(column_name).present?

          association_primary_key = Array(association_reflection.association_primary_key)[column_index]
          model.public_send("#{column_name}=", association.send(association_primary_key))
        end
      end
    end

    def import_associations(models, options)
      # now, for all the dirty associations, collect them into a new set of models, then recurse.
      # notes:
      #    does not handle associations that reference themselves
      #    should probably take a hash to associations to follow.
      return if models.nil?
      associated_objects_by_class = {}
      models.each { |model| find_associated_objects_for_import(associated_objects_by_class, model) }

      # :on_duplicate_key_update only supported for all fields
      options.delete(:on_duplicate_key_update) unless options[:on_duplicate_key_update] == :all
      # :returning not supported for associations
      options.delete(:returning)

      associated_objects_by_class.each_value do |associations|
        associations.each_value do |associated_records|
          associated_records.first.class.bulk_import(associated_records, options) unless associated_records.empty?
        end
      end
    end

    # We are eventually going to call Class.import <objects> so we build up a hash
    # of class => objects to import.
    def find_associated_objects_for_import(associated_objects_by_class, model)
      associated_objects_by_class[model.class.name] ||= {}
      return associated_objects_by_class unless model.id

      association_reflections =
        model.class.reflect_on_all_associations(:has_one) +
        model.class.reflect_on_all_associations(:has_many)
      association_reflections.each do |association_reflection|
        associated_objects_by_class[model.class.name][association_reflection.name] ||= []

        association = model.association(association_reflection.name)
        association.loaded!

        # Wrap target in an array if not already
        association = Array(association.target)

        changed_objects = association.select { |a| a.new_record? || a.changed? }
        changed_objects.each do |child|
          child.public_send("#{association_reflection.foreign_key}=", model.id)
          # For polymorphic associations
          association_name = if model.class.respond_to?(:polymorphic_name)
            model.class.polymorphic_name
          else
            model.class.base_class
          end
          association_reflection.type.try do |type|
            child.public_send("#{type}=", association_name)
          end
        end
        associated_objects_by_class[model.class.name][association_reflection.name].concat changed_objects
      end
      associated_objects_by_class
    end

    # Returns SQL the VALUES for an INSERT statement given the passed in +columns+
    # and +array_of_attributes+.
    def values_sql_for_columns_and_attributes(columns, array_of_attributes) # :nodoc:
      # connection gets called a *lot* in this high intensity loop.
      # Reuse the same one w/in the loop, otherwise it would keep being re-retreived (= lots of time for large imports)
      connection_memo = connection

      array_of_attributes.map do |arr|
        my_values = arr.each_with_index.map do |val, j|
          column = columns[j]

          # be sure to query sequence_name *last*, only if cheaper tests fail, because it's costly
          if val.nil? && Array(primary_key).first == column.name && !sequence_name.blank?
            connection_memo.next_value_for_sequence(sequence_name)
          elsif val.respond_to?(:to_sql)
            "(#{val.to_sql})"
          elsif column
            if respond_to?(:type_caster)                                         # Rails 5.0 and higher
              type = type_for_attribute(column.name)
              val = !type.respond_to?(:subtype) && type.type == :boolean ? type.cast(val) : type.serialize(val)
              connection_memo.quote(val)
            elsif column.respond_to?(:type_cast_from_user)                       # Rails 4.2
              connection_memo.quote(column.type_cast_from_user(val), column)
            else                                                                 # Rails 3.2, 4.0 and 4.1
              if serialized_attributes.include?(column.name)
                val = serialized_attributes[column.name].dump(val)
              end
              # Fixes #443 to support binary (i.e. bytea) columns on PG
              val = column.type_cast(val) unless column.type && column.type.to_sym == :binary
              connection_memo.quote(val, column)
            end
          else
            raise ArgumentError, "Number of values (#{arr.length}) exceeds number of columns (#{columns.length})"
          end
        end
        "(#{my_values.join(',')})"
      end
    end

    def add_special_rails_stamps( column_names, array_of_attributes, options )
      timestamp_columns = {}
      timestamps        = {}

      if respond_to?(:all_timestamp_attributes_in_model, true) # Rails 5.1 and higher
        timestamp_columns[:create] = timestamp_attributes_for_create_in_model
        timestamp_columns[:update] = timestamp_attributes_for_update_in_model
      else
        instance = allocate
        timestamp_columns[:create] = instance.send(:timestamp_attributes_for_create_in_model)
        timestamp_columns[:update] = instance.send(:timestamp_attributes_for_update_in_model)
      end

      # use tz as set in ActiveRecord::Base
      default_timezone = if ActiveRecord.respond_to?(:default_timezone)
        ActiveRecord.default_timezone
      else
        ActiveRecord::Base.default_timezone
      end
      timestamp = default_timezone == :utc ? Time.now.utc : Time.now

      [:create, :update].each do |action|
        timestamp_columns[action].each do |column|
          column = column.to_s
          timestamps[column] = timestamp

          index = column_names.index(column) || column_names.index(column.to_sym)
          if index
            # replace every instance of the array of attributes with our value
            array_of_attributes.each { |arr| arr[index] = timestamp if arr[index].nil? }
          else
            column_names << column
            array_of_attributes.each { |arr| arr << timestamp }
          end

          if supports_on_duplicate_key_update? && action == :update
            connection.add_column_for_on_duplicate_key_update(column, options)
          end
        end
      end

      timestamps
    end

    # Returns an Array of Hashes for the passed in +column_names+ and +array_of_attributes+.
    def validations_array_for_column_names_and_attributes( column_names, array_of_attributes ) # :nodoc:
      array_of_attributes.map { |values| Hash[column_names.zip(values)] }
    end

    # Checks that the imported hash has the required_keys, optionally also checks that the hash has
    # no keys beyond those required when `allow_extra_keys` is false.
    # returns `nil` if validation passes, or an error message if it fails
    def validate_hash_import(hash, required_keys, allow_extra_keys) # :nodoc:
      extra_keys = allow_extra_keys ? [] : hash.keys - required_keys
      missing_keys = required_keys - hash.keys

      return nil if extra_keys.empty? && missing_keys.empty?

      if allow_extra_keys
        <<-EOS
Hash key mismatch.

When importing an array of hashes with provided columns_names, each hash must contain keys for all column_names.

Required keys: #{required_keys}
Missing keys: #{missing_keys}

Hash: #{hash}
        EOS
      else
        <<-EOS
Hash key mismatch.

When importing an array of hashes, all hashes must have the same keys.
If you have records that are missing some values, we recommend you either set default values
for the missing keys or group these records into batches by key set before importing.

Required keys: #{required_keys}
Extra keys: #{extra_keys}
Missing keys: #{missing_keys}

Hash: #{hash}
        EOS
      end
    end
  end
end