File: scheduled.rb

package info (click to toggle)
ruby-sidekiq 3.2.6~dfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,128 kB
  • ctags: 1,222
  • sloc: ruby: 5,848; makefile: 37; sh: 4
file content (101 lines) | stat: -rw-r--r-- 3,951 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/actor'
require 'sidekiq/api'

module Sidekiq
  module Scheduled

    INITIAL_WAIT = 10

    ##
    # The Poller checks Redis every N seconds for jobs in the retry or scheduled
    # set have passed their timestamp and should be enqueued.  If so, it
    # just pops the job back onto its original queue so the
    # workers can pick it up like any other job.
    class Poller
      include Util
      include Actor

      SETS = %w(retry schedule)

      def poll(first_time=false)
        watchdog('scheduling poller thread died!') do
          initial_wait if first_time

          begin
            # A job's "score" in Redis is the time at which it should be processed.
            # Just check Redis for the set of jobs with a timestamp before now.
            now = Time.now.to_f.to_s
            Sidekiq.redis do |conn|
              SETS.each do |sorted_set|
                # Get the next item in the queue if it's score (time to execute) is <= now.
                # We need to go through the list one at a time to reduce the risk of something
                # going wrong between the time jobs are popped from the scheduled queue and when
                # they are pushed onto a work queue and losing the jobs.
                while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

                  # Pop item off the queue and add it to the work queue. If the job can't be popped from
                  # the queue, it's because another process already popped it so we can move on to the
                  # next one.
                  if conn.zrem(sorted_set, job)
                    Sidekiq::Client.push(Sidekiq.load_json(job))
                    logger.debug { "enqueued #{sorted_set}: #{job}" }
                  end
                end
              end
            end
          rescue => ex
            # Most likely a problem with redis networking.
            # Punt and try again at the next interval
            logger.error ex.message
            logger.error ex.backtrace.first
          end

          # Randomizing scales the interval by half since
          # on average calling `rand` returns 0.5.
          # We make up for this by doubling the interval
          after(poll_interval * 2 * rand) { poll }
        end
      end

      private

      # We do our best to tune poll_interval to the size of the active Sidekiq
      # cluster.  If you have 30 processes and poll every 15 seconds, that means one
      # Sidekiq is checking Redis every 0.5 seconds - way too often for most people
      # and really bad if the retry or scheduled sets are large.
      #
      # Instead try to avoid polling more than once every 15 seconds.  If you have
      # 30 Sidekiq processes, we'll set poll_interval to 30 * 15 * 2 or 900 seconds.
      # To keep things statistically random, we'll sleep a random amount between
      # 0 and 900 seconds for each poll or 450 seconds on average.  Otherwise restarting
      # all your Sidekiq processes at the same time will lead to them all polling at
      # the same time: the thundering herd problem.
      #
      # We only do this if poll_interval is unset (the default).
      def poll_interval
        Sidekiq.options[:poll_interval] ||= begin
          ps = Sidekiq::ProcessSet.new
          pcount = ps.size
          pcount = 1 if pcount == 0
          pcount * 15
        end
      end

      def initial_wait
        begin
          # Have all processes sleep between 10-15 seconds.  10 seconds
          # to give time for the heartbeat to register and 5 random seconds
          # to ensure they don't all hit Redis at the same time.
          sleep(INITIAL_WAIT)
          sleep(5 * rand)
        rescue Celluloid::Task::TerminatedError
          # Hit Ctrl-C when Sidekiq is finished booting and we have a chance
          # to get here.
        end
      end

    end
  end
end