File: worker_attributes.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (264 lines) | stat: -rw-r--r-- 9,589 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# frozen_string_literal: true

module WorkerAttributes
  extend ActiveSupport::Concern
  include Gitlab::ClassAttributes

  # Resource boundaries that workers can declare through the
  # `resource_boundary` attribute
  VALID_RESOURCE_BOUNDARIES = [:memory, :cpu, :unknown].freeze

  # Urgencies that workers can declare through the `urgencies` attribute
  VALID_URGENCIES = [:high, :low, :throttled].freeze

  # Ordered in increasing restrictiveness
  VALID_DATA_CONSISTENCIES = [:delayed, :sticky, :always].freeze
  LOAD_BALANCED_DATA_CONSISTENCIES = [:delayed, :sticky].freeze

  DEFAULT_DATA_CONSISTENCY = :always
  DEFAULT_DATA_CONSISTENCY_PER_DB = Gitlab::Database::LoadBalancing.each_load_balancer.to_h do |lb|
    [lb.name, DEFAULT_DATA_CONSISTENCY]
  end.freeze

  NAMESPACE_WEIGHTS = {
    auto_devops: 2,
    auto_merge: 3,
    chaos: 2,
    deployment: 3,
    mail_scheduler: 2,
    notifications: 2,
    pipeline_cache: 3,
    pipeline_creation: 4,
    pipeline_default: 3,
    pipeline_hooks: 2,
    pipeline_processing: 5,

    # EE-specific
    epics: 2,
    incident_management: 2,
    security_scans: 2
  }.stringify_keys.freeze

  DEFAULT_DEFER_DELAY = 5.seconds

  class_methods do
    def feature_category(value, *extras)
      set_class_attribute(:feature_category, value)
    end

    def prefer_calling_context_feature_category(preference = false)
      set_class_attribute(:prefer_calling_context_feature_category, preference)
    end

    # Special case: if a worker is not owned, get the feature category
    # (if present) from the calling context.
    def get_feature_category
      feature_category = get_class_attribute(:feature_category)
      calling_context_feature_category_preferred = !!get_class_attribute(:prefer_calling_context_feature_category)

      return feature_category unless feature_category == :not_owned || calling_context_feature_category_preferred

      Gitlab::ApplicationContext.current_context_attribute('meta.feature_category') || feature_category
    end

    def feature_category_not_owned?
      get_feature_category == :not_owned
    end

    # This should be set to :high for jobs that need to be run
    # immediately, or, if they are delayed, risk creating
    # inconsistencies in the application that could being perceived by
    # the user as incorrect behavior (ie, a bug)
    #
    # See
    # doc/development/sidekiq_style_guide.md#urgency
    # for details
    def urgency(urgency)
      raise "Invalid urgency: #{urgency}" unless VALID_URGENCIES.include?(urgency)

      set_class_attribute(:urgency, urgency)
    end

    def get_urgency
      get_class_attribute(:urgency) || :low
    end

    # Allows configuring worker's data_consistency.
    #
    #  Worker can utilize Sidekiq readonly database replicas capabilities by setting data_consistency attribute.
    #  Workers with data_consistency set to :delayed or :sticky, calling #perform_async
    #  will be delayed in order to give replication process enough time to complete.
    #
    #  - *default* - The default data_consistency value. Valid values are:
    #    - 'always' - The job is required to use the primary database (default).
    #    - 'sticky' - The job uses a replica as long as possible. It switches to primary either on write or long replication lag.
    #    - 'delayed' - The job would switch to primary only on write. It would use replica always.
    #      If there's a long replication lag the job will be delayed, and only if the replica is not up to date on the next retry,
    #      it will switch to the primary.
    #  - *overrides* - allows you to override data consistency for specific database connections. Only used in multiple
    #    database mode. Valid for values in `Gitlab::Database.database_base_models.keys`
    #  - *feature_flag* - allows you to toggle a job's `data_consistency, which permits you to safely toggle load balancing capabilities for a specific job.
    #    If disabled, job will default to `:always`, which means that the job will always use the primary.
    def data_consistency(default, overrides: nil, feature_flag: nil)
      validate_data_consistency(default, overrides)
      raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]

      set_class_attribute(:data_consistency_feature_flag, feature_flag) if feature_flag
      set_class_attribute(:data_consistency, default)

      # only override data consistency when using multiple databases
      overrides = nil unless Gitlab::Database.database_mode == Gitlab::Database::MODE_MULTIPLE_DATABASES
      set_class_attribute(:data_consistency_per_database, compute_data_consistency_per_database(default, overrides))
    end

    def validate_data_consistency(data_consistency, db_specific)
      valid_default = VALID_DATA_CONSISTENCIES.include?(data_consistency)
      raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless valid_default

      return unless db_specific

      valid_db_specific_hash = db_specific.values.all? { |dc| VALID_DATA_CONSISTENCIES.include?(dc) }
      raise ArgumentError, "Invalid data consistency: #{db_specific}" unless valid_db_specific_hash
    end

    # If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica
    def utilizes_load_balancing_capabilities?
      get_data_consistency_per_database.values.any? { |v| LOAD_BALANCED_DATA_CONSISTENCIES.include?(v) }
    end

    def get_least_restrictive_data_consistency
      consistencies = get_data_consistency_per_database.values
      VALID_DATA_CONSISTENCIES.find { |dc| consistencies.include?(dc) } || DEFAULT_DATA_CONSISTENCY # rubocop:disable Gitlab/NoFindInWorkers -- not ActiveRecordFind
    end

    def get_data_consistency_per_database
      dc_hash = get_class_attribute(:data_consistency_per_database) if get_data_consistency_feature_flag_enabled?
      dc_hash || DEFAULT_DATA_CONSISTENCY_PER_DB
    end

    def compute_data_consistency_per_database(default, overrides)
      hash = overrides || {}

      Gitlab::Database::LoadBalancing.each_load_balancer do |lb|
        hash[lb.name] ||= default || DEFAULT_DATA_CONSISTENCY
      end

      hash
    end

    def get_data_consistency_feature_flag_enabled?
      return true unless get_class_attribute(:data_consistency_feature_flag)

      Feature.enabled?(get_class_attribute(:data_consistency_feature_flag), Feature.current_request, type: :worker)
    end

    # Set this attribute on a job when it will call to services outside of the
    # application, such as 3rd party applications, other k8s clusters etc See
    # doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for
    # details
    def worker_has_external_dependencies!
      set_class_attribute(:external_dependencies, true)
    end

    # Returns true if the worker has external dependencies.
    # See doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies
    # for details
    def worker_has_external_dependencies?
      !!get_class_attribute(:external_dependencies)
    end

    def worker_resource_boundary(boundary)
      raise "Invalid boundary" unless VALID_RESOURCE_BOUNDARIES.include? boundary

      set_class_attribute(:resource_boundary, boundary)
    end

    def get_worker_resource_boundary
      get_class_attribute(:resource_boundary) || :unknown
    end

    def idempotent!
      set_class_attribute(:idempotent, true)
    end

    def idempotent?
      !!get_class_attribute(:idempotent)
    end

    def weight(value)
      set_class_attribute(:weight, value)
    end

    def pause_control(value)
      ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.set_strategy_for(strategy: value, worker: self)
    end

    def get_pause_control
      ::Gitlab::SidekiqMiddleware::PauseControl::WorkersMap.strategy_for(worker: self)
    end

    def concurrency_limit(max_jobs)
      ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.set_limit_for(
        worker: self,
        max_jobs: max_jobs
      )
    end

    def get_weight
      get_class_attribute(:weight) ||
        NAMESPACE_WEIGHTS[queue_namespace] ||
        1
    end

    def tags(*values)
      set_class_attribute(:tags, values)
    end

    def get_tags
      Array(get_class_attribute(:tags))
    end

    def deduplicate(strategy, options = {})
      set_class_attribute(:deduplication_strategy, strategy)
      set_class_attribute(:deduplication_options, options)
    end

    def get_deduplicate_strategy
      get_class_attribute(:deduplication_strategy) ||
        Gitlab::SidekiqMiddleware::DuplicateJobs::DuplicateJob::DEFAULT_STRATEGY
    end

    def get_deduplication_options
      get_class_attribute(:deduplication_options) || {}
    end

    def deduplication_enabled?
      return true unless get_deduplication_options[:feature_flag]

      Feature.enabled?(get_deduplication_options[:feature_flag], type: :worker)
    end

    def big_payload!
      set_class_attribute(:big_payload, true)
    end

    def big_payload?
      !!get_class_attribute(:big_payload)
    end

    def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY, &block)
      set_class_attribute(
        :database_health_check_attrs,
        { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by, block: block }
      )
    end

    def defer_on_database_health_signal?
      database_health_check_attrs.present?
    end

    def database_health_check_attrs
      get_class_attribute(:database_health_check_attrs)
    end
  end
end