1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
# frozen_string_literal: true
module BulkImports
class EntityWorker
include ApplicationWorker
include ExclusiveLeaseGuard
idempotent!
deduplicate :until_executing
data_consistency :sticky
feature_category :importers
sidekiq_options retry: 3, dead: false
worker_has_external_dependencies!
sidekiq_retries_exhausted do |msg, exception|
new.perform_failure(exception, msg['args'].first)
end
PERFORM_DELAY = 5.seconds
def perform(entity_id)
@entity = ::BulkImports::Entity.find(entity_id)
return unless @entity.started?
if running_tracker.present?
log_info(message: 'Stage running', entity_stage: running_tracker.stage)
else
# Use lease guard to prevent duplicated workers from starting multiple stages
try_obtain_lease do
start_next_stage
end
end
re_enqueue
end
def perform_failure(exception, entity_id)
@entity = ::BulkImports::Entity.find(entity_id)
Gitlab::ErrorTracking.track_exception(
exception,
{
message: "Request to export #{entity.source_type} failed"
}.merge(logger.default_attributes)
)
entity.fail_op!
end
private
attr_reader :entity
def re_enqueue
with_context(bulk_import_entity_id: entity.id) do
BulkImports::EntityWorker.perform_in(PERFORM_DELAY, entity.id)
end
end
def running_tracker
@running_tracker ||= BulkImports::Tracker.running_trackers(entity.id).first
end
def next_pipeline_trackers_for(entity_id)
BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
def start_next_stage
next_pipeline_trackers = next_pipeline_trackers_for(entity.id)
next_pipeline_trackers.each_with_index do |pipeline_tracker, index|
log_info(message: 'Stage starting', entity_stage: pipeline_tracker.stage) if index == 0
with_context(bulk_import_entity_id: entity.id) do
BulkImports::PipelineWorker.perform_async(
pipeline_tracker.id,
pipeline_tracker.stage,
entity.id
)
if Import::BulkImports::EphemeralData.new(entity.bulk_import.id).importer_user_mapping_enabled?
Import::LoadPlaceholderReferencesWorker.perform_async(
Import::SOURCE_DIRECT_TRANSFER,
entity.bulk_import.id,
'current_user_id' => entity.bulk_import.user_id
)
end
end
end
end
def lease_timeout
PERFORM_DELAY
end
def lease_key
"gitlab:bulk_imports:entity_worker:#{entity.id}"
end
def log_lease_taken
log_info(message: lease_taken_message)
end
def logger
@logger ||= Logger.build.with_entity(entity)
end
def log_info(payload)
logger.info(structured_payload(payload))
end
end
end
|