File: manager.rb

package info (click to toggle)
ruby-sidekiq 3.2.6~dfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,128 kB
  • ctags: 1,222
  • sloc: ruby: 5,848; makefile: 37; sh: 4
file content (225 lines) | stat: -rw-r--r-- 5,998 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# encoding: utf-8
require 'sidekiq/util'
require 'sidekiq/actor'
require 'sidekiq/processor'
require 'sidekiq/fetch'

module Sidekiq

  ##
  # The main router in the system.  This
  # manages the processor state and accepts messages
  # from Redis to be dispatched to an idle processor.
  #
  class Manager
    include Util
    include Actor
    trap_exit :processor_died

    attr_reader :ready
    attr_reader :busy
    attr_accessor :fetcher

    SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1

    def initialize(options={})
      logger.debug { options.inspect }
      @options = options
      @count = options[:concurrency] || 25
      @done_callback = nil

      @in_progress = {}
      @threads = {}
      @done = false
      @busy = []
      @ready = @count.times.map do
        p = Processor.new_link(current_actor)
        p.proxy_id = p.object_id
        p
      end
    end

    def stop(options={})
      watchdog('Manager#stop died') do
        should_shutdown = options[:shutdown]
        timeout = options[:timeout]

        @done = true

        logger.info { "Shutting down #{@ready.size} quiet workers" }
        @ready.each { |x| x.terminate if x.alive? }
        @ready.clear

        return if clean_up_for_graceful_shutdown

        hard_shutdown_in timeout if should_shutdown
      end
    end

    def clean_up_for_graceful_shutdown
      if @busy.empty?
        shutdown
        return true
      end

      after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
      false
    end

    def start
      @ready.each { dispatch }
    end

    def when_done(&blk)
      @done_callback = blk
    end

    def processor_done(processor)
      watchdog('Manager#processor_done died') do
        @done_callback.call(processor) if @done_callback
        @in_progress.delete(processor.object_id)
        @threads.delete(processor.object_id)
        @busy.delete(processor)
        if stopped?
          processor.terminate if processor.alive?
          shutdown if @busy.empty?
        else
          @ready << processor if processor.alive?
        end
        dispatch
      end
    end

    def processor_died(processor, reason)
      watchdog("Manager#processor_died died") do
        @in_progress.delete(processor.object_id)
        @threads.delete(processor.object_id)
        @busy.delete(processor)

        unless stopped?
          p = Processor.new_link(current_actor)
          p.proxy_id = p.object_id
          @ready << p
          dispatch
        else
          shutdown if @busy.empty?
        end
      end
    end

    def assign(work)
      watchdog("Manager#assign died") do
        if stopped?
          # Race condition between Manager#stop if Fetcher
          # is blocked on redis and gets a message after
          # all the ready Processors have been stopped.
          # Push the message back to redis.
          work.requeue
        else
          processor = @ready.pop
          @in_progress[processor.object_id] = work
          @busy << processor
          processor.async.process(work)
        end
      end
    end

    # A hack worthy of Rube Goldberg.  We need to be able
    # to hard stop a working thread.  But there's no way for us to
    # get handle to the underlying thread performing work for a processor
    # so we have it call us and tell us.
    def real_thread(proxy_id, thr)
      @threads[proxy_id] = thr
    end

    def heartbeat(key, data, json)
      proctitle = ['sidekiq', Sidekiq::VERSION]
      proctitle << data['tag'] unless data['tag'].empty?
      proctitle << "[#{@busy.size} of #{data['concurrency']} busy]"
      proctitle << 'stopping' if stopped?
      $0 = proctitle.join(' ')

      ❤(key, json)
      after(5) do
        heartbeat(key, data, json)
      end
    end

    private

    def ❤(key, json)
      begin
        _, _, _, msg = Sidekiq.redis do |conn|
          conn.multi do
            conn.sadd('processes', key)
            conn.hmset(key, 'info', json, 'busy', @busy.size, 'beat', Time.now.to_f)
            conn.expire(key, 60)
            conn.rpop("#{key}-signals")
          end
        end

        ::Process.kill(msg, $$) if msg
      rescue => e
        # ignore all redis/network issues
        logger.error("heartbeat: #{e.message}")
      end
    end

    def hard_shutdown_in(delay)
      logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

      after(delay) do
        watchdog("Manager#hard_shutdown_in died") do
          # We've reached the timeout and we still have busy workers.
          # They must die but their messages shall live on.
          logger.warn { "Terminating #{@busy.size} busy worker threads" }
          logger.warn { "Work still in progress #{@in_progress.values.inspect}" }

          requeue

          @busy.each do |processor|
            if processor.alive? && t = @threads.delete(processor.object_id)
              t.raise Shutdown
            end
          end

          signal_shutdown
        end
      end
    end

    def dispatch
      return if stopped?
      # This is a safety check to ensure we haven't leaked
      # processors somehow.
      raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
      raise "No ready processor!?" if @ready.empty?

      @fetcher.async.fetch
    end

    def stopped?
      @done
    end

    def shutdown
      requeue
      signal_shutdown
    end

    def signal_shutdown
      after(0) { signal(:shutdown) }
    end

    def requeue
      # Re-enqueue terminated jobs
      # NOTE: You may notice that we may push a job back to redis before
      # the worker thread is terminated. This is ok because Sidekiq's
      # contract says that jobs are run AT LEAST once. Process termination
      # is delayed until we're certain the jobs are back in Redis because
      # it is worse to lose a job than to run it twice.
      Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values, @options)
      @in_progress.clear
    end
  end
end