File: manager.rb

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (134 lines) | stat: -rw-r--r-- 3,871 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# frozen_string_literal: true

require "sidekiq/processor"
require "set"

module Sidekiq
  ##
  # The Manager is the central coordination point in Sidekiq, controlling
  # the lifecycle of the Processors.
  #
  # Tasks:
  #
  # 1. start: Spin up Processors.
  # 3. processor_died: Handle job failure, throw away Processor, create new one.
  # 4. quiet: shutdown idle Processors.
  # 5. stop: hard stop the Processors by deadline.
  #
  # Note that only the last task requires its own Thread since it has to monitor
  # the shutdown process.  The other tasks are performed by other threads.
  #
  class Manager
    include Sidekiq::Component

    attr_reader :workers
    attr_reader :capsule

    def initialize(capsule)
      @config = @capsule = capsule
      @count = capsule.concurrency
      raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

      @done = false
      @workers = Set.new
      @plock = Mutex.new
      @count.times do
        @workers << Processor.new(@config, &method(:processor_result))
      end
    end

    def start
      @workers.each(&:start)
    end

    def quiet
      return if @done
      @done = true

      logger.info { "Terminating quiet threads for #{capsule.name} capsule" }
      @workers.each(&:terminate)
    end

    def stop(deadline)
      quiet

      # some of the shutdown events can be async,
      # we don't have any way to know when they're done but
      # give them a little time to take effect
      sleep PAUSE_TIME
      return if @workers.empty?

      logger.info { "Pausing to allow jobs to finish..." }
      wait_for(deadline) { @workers.empty? }
      return if @workers.empty?

      hard_shutdown
    ensure
      capsule.stop
    end

    def processor_result(processor, reason = nil)
      @plock.synchronize do
        @workers.delete(processor)
        unless @done
          p = Processor.new(@config, &method(:processor_result))
          @workers << p
          p.start
        end
      end
    end

    def stopped?
      @done
    end

    private

    def hard_shutdown
      # We've reached the timeout and we still have busy threads.
      # They must die but their jobs shall live on.
      cleanup = nil
      @plock.synchronize do
        cleanup = @workers.dup
      end

      if cleanup.size > 0
        jobs = cleanup.map { |p| p.job }.compact

        logger.warn { "Terminating #{cleanup.size} busy threads" }
        logger.debug { "Jobs still in progress #{jobs.inspect}" }

        # Re-enqueue unfinished jobs
        # NOTE: You may notice that we may push a job back to redis before
        # the thread is terminated. This is ok because Sidekiq's
        # contract says that jobs are run AT LEAST once. Process termination
        # is delayed until we're certain the jobs are back in Redis because
        # it is worse to lose a job than to run it twice.
        capsule.fetcher.bulk_requeue(jobs)
      end

      cleanup.each do |processor|
        processor.kill
      end

      # when this method returns, we immediately call `exit` which may not give
      # the remaining threads time to run `ensure` blocks, etc. We pause here up
      # to 3 seconds to give threads a minimal amount of time to run `ensure` blocks.
      deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3
      wait_for(deadline) { @workers.empty? }
    end

    # hack for quicker development / testing environment #2774
    PAUSE_TIME = $stdout.tty? ? 0.1 : 0.5

    # Wait for the orblock to be true or the deadline passed.
    def wait_for(deadline, &condblock)
      remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
      while remaining > PAUSE_TIME
        return if condblock.call
        sleep PAUSE_TIME
        remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
      end
    end
  end
end