File: aggregation.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (132 lines) | stat: -rw-r--r-- 4,612 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# frozen_string_literal: true

class Analytics::CycleAnalytics::Aggregation < ApplicationRecord
  include FromUnion
  include Analytics::CycleAnalytics::Parentable

  validates :incremental_runtimes_in_seconds,
    :incremental_processed_records,
    :full_runtimes_in_seconds,
    :full_processed_records,
    presence: true,
    length: { maximum: 10 },
    allow_blank: true

  scope :priority_order,
    ->(column_to_sort = :last_incremental_run_at) { order(arel_table[column_to_sort].asc.nulls_first) }
  scope :enabled, -> { where('enabled IS TRUE') }

  def cursor_for(mode, model)
    {
      updated_at: self["last_#{mode}_#{model.table_name}_updated_at"],
      id: self["last_#{mode}_#{model.table_name}_id"]
    }.compact
  end

  def consistency_check_cursor_for(model)
    return {} if self["last_consistency_check_#{model.issuable_model.table_name}_issuable_id"].nil?

    {
      :end_event_timestamp => self["last_consistency_check_#{model.issuable_model.table_name}_end_event_timestamp"],
      model.issuable_id_column => self["last_consistency_check_#{model.issuable_model.table_name}_issuable_id"]
    }
  end

  def refresh_last_run(mode)
    self["last_#{mode}_run_at"] = Time.current
  end

  def complete
    self.last_full_issues_id = nil
    self.last_full_issues_updated_at = nil
    self.last_full_merge_requests_id = nil
    self.last_full_merge_requests_updated_at = nil
  end

  def set_cursor(mode, model, cursor)
    self["last_#{mode}_#{model.table_name}_id"] = cursor[:id]
    self["last_#{mode}_#{model.table_name}_updated_at"] = cursor[:updated_at]
  end

  def set_stats(mode, runtime, processed_records)
    # We only store the last 10 data points
    self["#{mode}_runtimes_in_seconds"] = (self["#{mode}_runtimes_in_seconds"] + [runtime]).last(10)
    self["#{mode}_processed_records"] = (self["#{mode}_processed_records"] + [processed_records]).last(10)
  end

  def estimated_next_run_at
    return unless enabled
    return if last_incremental_run_at.nil?

    estimation = duration_until_the_next_aggregation_job +
      average_aggregation_duration +
      (last_incremental_run_at - earliest_last_run_at)

    estimation < 1 ? nil : estimation.from_now
  end

  def self.safe_create_for_namespace(target_namespace)
    # Namespaces::ProjectNamespace has no root_ancestor
    # Related: https://gitlab.com/gitlab-org/gitlab/-/issues/386124
    namespace = if target_namespace.is_a?(Group) || target_namespace.is_a?(Namespaces::UserNamespace)
                  target_namespace
                else
                  target_namespace.parent
                end
    # personal namespace projects and associated ProjectNamespace respond to `namespace`
    # and this is close enough to "root ancestor"
    top_level_namespace =
      target_namespace.respond_to?(:root_ancestor) ? namespace.root_ancestor : namespace.namespace
    aggregation = find_by(group_id: top_level_namespace.id)
    return aggregation if aggregation&.enabled?

    # At this point we're sure that the group is licensed, we can always enable the aggregation.
    # This re-enables the aggregation in case the group downgraded and later upgraded the license.
    upsert({ group_id: top_level_namespace.id, enabled: true })

    find(top_level_namespace.id)
  end

  private

  # The aggregation job is scheduled every 10 minutes: */10 * * * *
  def duration_until_the_next_aggregation_job
    (10 - (DateTime.current.minute % 10)).minutes.seconds
  end

  def average_aggregation_duration
    return 0.seconds if incremental_runtimes_in_seconds.empty?

    average = incremental_runtimes_in_seconds.sum.fdiv(incremental_runtimes_in_seconds.size)
    average.seconds
  end

  def earliest_last_run_at
    max = self.class.select(:last_incremental_run_at)
      .where(enabled: true)
      .where.not(last_incremental_run_at: nil)
      .priority_order
      .limit(1)
      .to_sql

    connection.select_value("(#{max})")
  end

  def self.load_batch(last_run_at, column_to_query = :last_incremental_run_at, batch_size = 100)
    last_run_at_not_set = Analytics::CycleAnalytics::Aggregation
      .enabled
      .where(column_to_query => nil)
      .priority_order(column_to_query)
      .limit(batch_size)

    last_run_at_before = Analytics::CycleAnalytics::Aggregation
      .enabled
      .where(arel_table[column_to_query].lt(last_run_at))
      .priority_order(column_to_query)
      .limit(batch_size)

    Analytics::CycleAnalytics::Aggregation
      .from_union([last_run_at_not_set, last_run_at_before], remove_order: false, remove_duplicates: false)
      .limit(batch_size)
  end
end