File: enumerators.rb

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (135 lines) | stat: -rw-r--r-- 4,801 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# frozen_string_literal: true

require_relative "active_record_enumerator"
require_relative "csv_enumerator"

module Sidekiq
  module Job
    module Iterable
      module Enumerators
        # Builds Enumerator object from a given array, using +cursor+ as an offset.
        #
        # @param array [Array]
        # @param cursor [Integer] offset to start iteration from
        #
        # @return [Enumerator]
        #
        # @example
        #   array_enumerator(['build', 'enumerator', 'from', 'any', 'array'], cursor: cursor)
        #
        def array_enumerator(array, cursor:)
          raise ArgumentError, "array must be an Array" unless array.is_a?(Array)

          x = array.each_with_index.drop(cursor || 0)
          x.to_enum { x.size }
        end

        # Builds Enumerator from `ActiveRecord::Relation`.
        # Each Enumerator tick moves the cursor one row forward.
        #
        # @param relation [ActiveRecord::Relation] relation to iterate
        # @param cursor [Object] offset id to start iteration from
        # @param options [Hash] additional options that will be passed to relevant
        #   ActiveRecord batching methods
        #
        # @return [ActiveRecordEnumerator]
        #
        # @example
        #   def build_enumerator(cursor:)
        #     active_record_records_enumerator(User.all, cursor: cursor)
        #   end
        #
        #   def each_iteration(user)
        #     user.notify_about_something
        #   end
        #
        def active_record_records_enumerator(relation, cursor:, **options)
          ActiveRecordEnumerator.new(relation, cursor: cursor, **options).records
        end

        # Builds Enumerator from `ActiveRecord::Relation` and enumerates on batches of records.
        # Each Enumerator tick moves the cursor `:batch_size` rows forward.
        # @see #active_record_records_enumerator
        #
        # @example
        #   def build_enumerator(product_id, cursor:)
        #     active_record_batches_enumerator(
        #       Comment.where(product_id: product_id).select(:id),
        #       cursor: cursor,
        #       batch_size: 100
        #     )
        #   end
        #
        #   def each_iteration(batch_of_comments, product_id)
        #     comment_ids = batch_of_comments.map(&:id)
        #     CommentService.call(comment_ids: comment_ids)
        #   end
        #
        def active_record_batches_enumerator(relation, cursor:, **options)
          ActiveRecordEnumerator.new(relation, cursor: cursor, **options).batches
        end

        # Builds Enumerator from `ActiveRecord::Relation` and enumerates on batches,
        # yielding `ActiveRecord::Relation`s.
        # @see #active_record_records_enumerator
        #
        # @example
        #   def build_enumerator(product_id, cursor:)
        #     active_record_relations_enumerator(
        #       Product.find(product_id).comments,
        #       cursor: cursor,
        #       batch_size: 100,
        #     )
        #   end
        #
        #   def each_iteration(batch_of_comments, product_id)
        #     # batch_of_comments will be a Comment::ActiveRecord_Relation
        #     batch_of_comments.update_all(deleted: true)
        #   end
        #
        def active_record_relations_enumerator(relation, cursor:, **options)
          ActiveRecordEnumerator.new(relation, cursor: cursor, **options).relations
        end

        # Builds Enumerator from a CSV file.
        #
        # @param csv [CSV] an instance of CSV object
        # @param cursor [Integer] offset to start iteration from
        #
        # @example
        #   def build_enumerator(import_id, cursor:)
        #     import = Import.find(import_id)
        #     csv_enumerator(import.csv, cursor: cursor)
        #   end
        #
        #   def each_iteration(csv_row)
        #     # insert csv_row into database
        #   end
        #
        def csv_enumerator(csv, cursor:)
          CsvEnumerator.new(csv).rows(cursor: cursor)
        end

        # Builds Enumerator from a CSV file and enumerates on batches of records.
        #
        # @param csv [CSV] an instance of CSV object
        # @param cursor [Integer] offset to start iteration from
        # @option options :batch_size [Integer] (100) size of the batch
        #
        # @example
        #   def build_enumerator(import_id, cursor:)
        #     import = Import.find(import_id)
        #     csv_batches_enumerator(import.csv, cursor: cursor)
        #   end
        #
        #   def each_iteration(batch_of_csv_rows)
        #     # ...
        #   end
        #
        def csv_batches_enumerator(csv, cursor:, **options)
          CsvEnumerator.new(csv).batches(cursor: cursor, **options)
        end
      end
    end
  end
end