1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
# frozen_string_literal: true
module Ci
module PipelineProcessing
class AtomicProcessingService
include Gitlab::Utils::StrongMemoize
include ExclusiveLeaseGuard
attr_reader :pipeline
DEFAULT_LEASE_TIMEOUT = 1.minute
BATCH_SIZE = 20
def initialize(pipeline)
@pipeline = pipeline
@collection = AtomicProcessingService::StatusCollection.new(pipeline)
end
def execute
return unless pipeline.needs_processing?
# Run the process only if we can obtain an exclusive lease; returns nil if lease is unavailable
success = try_obtain_lease { process! }
if success
# If any jobs changed from stopped to alive status during pipeline processing, we must
# re-reset their dependent jobs; see https://gitlab.com/gitlab-org/gitlab/-/issues/388539.
new_alive_jobs.group_by(&:user).each do |user, jobs|
log_running_reset_skipped_jobs_service(jobs)
ResetSkippedJobsService.new(project, user).execute(jobs)
end
# Re-schedule if we need further processing
PipelineProcessWorker.perform_async(pipeline.id) if pipeline.needs_processing?
end
success
end
private
def process!
update_stages!
update_pipeline!
update_jobs_processed!
Ci::ExpirePipelineCacheService.new.execute(pipeline)
true
end
def update_stages!
pipeline.stages.ordered.each { |stage| update_stage!(stage) }
end
def update_stage!(stage)
sorted_update_stage!(stage)
status = @collection.status_of_stage(stage.position)
stage.set_status(status)
end
def sorted_update_stage!(stage)
ordered_jobs(stage).each { |job| update_job!(job) }
end
def ordered_jobs(stage)
jobs = load_jobs_in_batches(stage)
sorted_job_names = sort_jobs(jobs).each_with_index.to_h
jobs.sort_by { |job| sorted_job_names.fetch(job.name) }
end
def load_jobs_in_batches(stage)
@collection
.created_job_ids_in_stage(stage.position)
.in_groups_of(BATCH_SIZE, false)
.each_with_object([]) do |ids, jobs|
jobs.concat(load_jobs(ids))
end
end
def load_jobs(ids)
pipeline
.current_processable_jobs
.id_in(ids)
.with_project_preload
.created
.ordered_by_stage
.select_with_aggregated_needs(project)
end
def sort_jobs(jobs)
Gitlab::Ci::YamlProcessor::Dag.order( # rubocop: disable CodeReuse/ActiveRecord -- this is not ActiveRecord
jobs.to_h do |job|
[job.name, job.aggregated_needs_names.to_a]
end
)
end
def update_pipeline!
pipeline.set_status(@collection.status_of_all)
end
def update_jobs_processed!
processing = @collection.processing_jobs
processing.each_slice(BATCH_SIZE) do |slice|
pipeline.all_jobs.match_id_and_lock_version(slice)
.update_as_processed!
end
end
def update_job!(job)
previous_status = status_of_previous_jobs(job)
# We do not continue to process the job if the previous status is not completed
return unless Ci::HasStatus::COMPLETED_STATUSES.include?(previous_status)
::Deployments::CreateForJobService.new.execute(job)
Gitlab::OptimisticLocking.retry_lock(job, name: 'atomic_processing_update_job') do |subject|
Ci::ProcessBuildService.new(project, subject.user)
.execute(subject, previous_status)
# update internal representation of job
# to make the status change of job to be taken into account during further processing
@collection.set_job_status(job.id, job.status, job.lock_version)
end
end
def status_of_previous_jobs(job)
if job.scheduling_type_dag?
# job uses DAG, get status of all dependent needs
@collection.status_of_jobs(job.aggregated_needs_names.to_a)
else
# job uses Stages, get status of prior stage
@collection.status_of_jobs_prior_to_stage(job.stage_idx.to_i)
end
end
# Gets the jobs that changed from stopped to alive status since the initial status collection
# was evaluated. We determine this by checking if their current status is no longer stopped.
def new_alive_jobs
initial_stopped_job_names = @collection.stopped_job_names
return [] if initial_stopped_job_names.empty?
new_collection = AtomicProcessingService::StatusCollection.new(pipeline)
new_alive_job_names = initial_stopped_job_names - new_collection.stopped_job_names
return [] if new_alive_job_names.empty?
pipeline
.current_jobs
.by_name(new_alive_job_names)
.preload(:user) # rubocop: disable CodeReuse/ActiveRecord
.to_a
end
def project
pipeline.project
end
def lease_key
"#{super}::pipeline_id:#{pipeline.id}"
end
def lease_timeout
DEFAULT_LEASE_TIMEOUT
end
def lease_taken_log_level
:info
end
def log_running_reset_skipped_jobs_service(jobs)
Gitlab::AppJsonLogger.info(
class: self.class.name.to_s,
message: 'Running ResetSkippedJobsService on new alive jobs',
project_id: project.id,
pipeline_id: pipeline.id,
user_id: jobs.first.user.id,
jobs_count: jobs.count
)
end
end
end
end
|