File: tracker.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (144 lines) | stat: -rw-r--r-- 3,436 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# frozen_string_literal: true

class BulkImports::Tracker < ApplicationRecord
  include AfterCommitQueue

  self.table_name = 'bulk_import_trackers'

  alias_attribute :pipeline_name, :relation

  belongs_to :entity,
    class_name: 'BulkImports::Entity',
    inverse_of: :trackers,
    foreign_key: :bulk_import_entity_id,
    optional: false

  has_many :batches, class_name: 'BulkImports::BatchTracker', inverse_of: :tracker

  validates :relation,
    presence: true,
    uniqueness: { scope: :bulk_import_entity_id }

  validates :next_page, presence: { if: :has_next_page? }

  validates :stage, presence: true

  delegate :file_extraction_pipeline?, :abort_on_failure?, to: :pipeline_class

  DEFAULT_PAGE_SIZE = 500

  scope :next_pipeline_trackers_for, ->(entity_id) {
    entity_scope = where(bulk_import_entity_id: entity_id)
    next_stage_scope = entity_scope.with_status(:created).select('MIN(stage)')

    entity_scope.where(stage: next_stage_scope).with_status(:created)
  }

  scope :running_trackers, ->(entity_id) {
    where(bulk_import_entity_id: entity_id).with_status(:enqueued, :started)
  }

  def pipeline_class
    unless entity.pipeline_exists?(pipeline_name)
      raise BulkImports::Error, "'#{pipeline_name}' is not a valid BulkImport Pipeline"
    end

    pipeline_name.constantize
  end

  state_machine :status, initial: :created do
    state :created, value: 0
    state :started, value: 1
    state :finished, value: 2
    state :enqueued, value: 3
    state :timeout, value: 4
    state :failed, value: -1
    state :skipped, value: -2
    state :canceled, value: -3

    event :start do
      transition enqueued: :started
      # To avoid errors when re-starting a pipeline in case of network errors
      transition started: :started
    end

    event :retry do
      transition started: :enqueued
      # To avoid errors when retrying a pipeline in case of network errors
      transition enqueued: :enqueued
    end

    event :enqueue do
      transition created: :enqueued
    end

    event :finish do
      transition started: :finished
      transition failed: :failed
      transition skipped: :skipped
    end

    event :skip do
      transition any => :skipped
    end

    event :fail_op do
      transition any => :failed
    end

    event :cancel do
      transition any => :canceled
    end

    event :cleanup_stale do
      transition [:created, :started] => :timeout
    end

    after_transition any => [:finished, :failed] do |tracker|
      BulkImports::ObjectCounter.persist!(tracker)
    end

    after_transition any => [:canceled] do |tracker|
      tracker.run_after_commit do
        tracker.propagate_cancel
      end
    end
  end

  def checksums
    return unless file_extraction_pipeline?

    # Return cached counters until they expire
    { importing_relation => cached_checksums || persisted_checksums }
  end

  def checksums_empty?
    return true unless checksums

    sums = checksums[importing_relation]

    sums[:source] == 0 && sums[:fetched] == 0 && sums[:imported] == 0
  end

  def importing_relation
    pipeline_class.relation.to_sym
  end

  def propagate_cancel
    batches.each(&:cancel)
  end

  private

  def cached_checksums
    BulkImports::ObjectCounter.summary(self)
  end

  def persisted_checksums
    {
      source: source_objects_count,
      fetched: fetched_objects_count,
      imported: imported_objects_count
    }
  end
end