File: capsule.rb

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (132 lines) | stat: -rw-r--r-- 3,826 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# frozen_string_literal: true

require "sidekiq/component"

module Sidekiq
  # A Sidekiq::Capsule is the set of resources necessary to
  # process one or more queues with a given concurrency.
  # One "default" Capsule is started but the user may declare additional
  # Capsules in their initializer.
  #
  # This capsule will pull jobs from the "single" queue and process
  # the jobs with one thread, meaning the jobs will be processed serially.
  #
  # Sidekiq.configure_server do |config|
  #   config.capsule("single-threaded") do |cap|
  #     cap.concurrency = 1
  #     cap.queues = %w(single)
  #   end
  # end
  class Capsule
    include Sidekiq::Component
    extend Forwardable

    attr_reader :name
    attr_reader :queues
    attr_accessor :concurrency
    attr_reader :mode
    attr_reader :weights

    def_delegators :@config, :[], :[]=, :fetch, :key?, :has_key?, :merge!, :dig

    def initialize(name, config)
      @name = name
      @config = config
      @queues = ["default"]
      @weights = {"default" => 0}
      @concurrency = config[:concurrency]
      @mode = :strict
    end

    def fetcher
      @fetcher ||= begin
        inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self)
        inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup)
        inst
      end
    end

    def stop
      fetcher&.bulk_requeue([])
    end

    # Sidekiq checks queues in three modes:
    # - :strict - all queues have 0 weight and are checked strictly in order
    # - :weighted - queues have arbitrary weight between 1 and N
    # - :random - all queues have weight of 1
    def queues=(val)
      @weights = {}
      @queues = Array(val).each_with_object([]) do |qstr, memo|
        arr = qstr
        arr = qstr.split(",") if qstr.is_a?(String)
        name, weight = arr
        @weights[name] = weight.to_i
        [weight.to_i, 1].max.times do
          memo << name
        end
      end
      @mode = if @weights.values.all?(&:zero?)
        :strict
      elsif @weights.values.all? { |x| x == 1 }
        :random
      else
        :weighted
      end
    end

    # Allow the middleware to be different per-capsule.
    # Avoid if possible and add middleware globally so all
    # capsules share the same chains. Easier to debug that way.
    def client_middleware
      @client_chain ||= config.client_middleware.copy_for(self)
      yield @client_chain if block_given?
      @client_chain
    end

    def server_middleware
      @server_chain ||= config.server_middleware.copy_for(self)
      yield @server_chain if block_given?
      @server_chain
    end

    def redis_pool
      Thread.current[:sidekiq_redis_pool] || local_redis_pool
    end

    def local_redis_pool
      # connection pool is lazy, it will not create connections unless you actually need them
      # so don't be skimpy!
      @redis ||= config.new_redis_pool(@concurrency, name)
    end

    def redis
      raise ArgumentError, "requires a block" unless block_given?
      redis_pool.with do |conn|
        retryable = true
        begin
          yield conn
        rescue RedisClientAdapter::BaseError => ex
          # 2550 Failover can cause the server to become a replica, need
          # to disconnect and reopen the socket to get back to the primary.
          # 4495 Use the same logic if we have a "Not enough replicas" error from the primary
          # 4985 Use the same logic when a blocking command is force-unblocked
          # The same retry logic is also used in client.rb
          if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/
            conn.close
            retryable = false
            retry
          end
          raise
        end
      end
    end

    def lookup(name)
      config.lookup(name)
    end

    def logger
      config.logger
    end
  end
end