File: iterator.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (83 lines) | stat: -rw-r--r-- 2,778 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# frozen_string_literal: true

module ClickHouse
  # This class implements a batch iterator which can be used for ClickHouse database tables.
  # The batching logic uses fixed id ranges because that's the only way to efficiently batch
  # over the data. This is similar to the implementation of the Gitlab::Database::BatchCount
  # utility class.
  #
  # Usage:
  #
  # connection = ClickHouse::Connection.new(:main)
  # builder = ClickHouse::QueryBuilder.new('event_authors')
  # iterator = ClickHouse::Iterator.new(query_builder: builder, connection: connection)
  # iterator.each_batch(column: :author_id, of: 100000) do |scope|
  #   puts scope.to_sql
  #   puts ClickHouse::Client.select(scope.to_sql, :main)
  # end
  #
  # If your database table structure is optimized for a specific filter, you could scan smaller
  # part of the table by adding more condition to the query builder. Example:
  #
  # builder = ClickHouse::QueryBuilder.new('event_authors').where(type: 'some_type')
  class Iterator
    # rubocop: disable CodeReuse/ActiveRecord -- this is a ClickHouse query builder class usin Arel
    def initialize(query_builder:, connection:, min_value: nil, min_max_strategy: :min_max)
      @query_builder = query_builder
      @connection = connection
      @min_value = min_value
      @min_max_strategy = min_max_strategy
    end

    def each_batch(column: :id, of: 10_000)
      min, max = min_max(column)
      return if min.nil? || max.nil? || max == 0

      loop do
        break if min > max

        upper_bound = (min + of) - 1
        yield query_builder
          .where(table[column].gteq(min))
          .where(table[column].lteq(upper_bound)), min, upper_bound

        min += of
      end
    end

    private

    delegate :table, to: :query_builder

    attr_reader :query_builder, :connection, :min_value, :min_max_strategy

    def min_max(column)
      case min_max_strategy
      when :min_max
        min_max_query = query_builder.select(
          table[column].minimum.as('min'),
          table[column].maximum.as('max')
        )

        row = connection.select(min_max_query.to_sql).first
        return if row.nil?

        [min_value || row['min'], row['max']]
      when :order_limit
        min_query = query_builder.select(table[column]).order(column, :asc).limit(1)
        max_query = query_builder.select(table[column]).order(column, :desc).limit(1)

        query = "SELECT (#{min_query.to_sql}) AS min, (#{max_query.to_sql}) AS max"

        row = connection.select(query).first
        return if row.nil?

        [min_value || row['min'], row['max']]
      else
        raise ArgumentError, "Unknown min_max strategy is given: #{min_max_strategy}"
      end
    end

    # rubocop: enable CodeReuse/ActiveRecord
  end
end