File: processor.rb

package info (click to toggle)
ruby-sidekiq 3.2.6~dfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,128 kB
  • ctags: 1,222
  • sloc: ruby: 5,848; makefile: 37; sh: 4
file content (151 lines) | stat: -rw-r--r-- 4,432 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
require 'sidekiq/util'
require 'sidekiq/actor'

require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'

module Sidekiq
  ##
  # The Processor receives a message from the Manager and actually
  # processes it.  It instantiates the worker, runs the middleware
  # chain and then calls Sidekiq::Worker#perform.
  class Processor
    # To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage
    # so keep them around for a long time
    STATS_TIMEOUT = 24 * 60 * 60 * 365 * 5

    include Util
    include Actor

    def self.default_middleware
      Middleware::Chain.new do |m|
        m.add Middleware::Server::Logging
        m.add Middleware::Server::RetryJobs
        if defined?(::ActiveRecord::Base)
          require 'sidekiq/middleware/server/active_record'
          m.add Sidekiq::Middleware::Server::ActiveRecord
        end
      end
    end

    attr_accessor :proxy_id

    def initialize(boss)
      @boss = boss
    end

    def process(work)
      msgstr = work.message
      queue = work.queue_name

      @boss.async.real_thread(proxy_id, Thread.current)

      ack = true
      begin
        msg = Sidekiq.load_json(msgstr)
        klass  = msg['class'].constantize
        worker = klass.new
        worker.jid = msg['jid']

        stats(worker, msg, queue) do
          Sidekiq.server_middleware.invoke(worker, msg, queue) do
            execute_job(worker, cloned(msg['args']))
          end
        end
      rescue Sidekiq::Shutdown
        # Had to force kill this job because it didn't finish
        # within the timeout.  Don't acknowledge the work since
        # we didn't properly finish it.
        ack = false
      rescue Exception => ex
        handle_exception(ex, msg || { :message => msgstr })
        raise
      ensure
        work.acknowledge if ack
      end

      @boss.async.processor_done(current_actor)
    end

    def inspect
      "<Processor##{object_id.to_s(16)}>"
    end

    def execute_job(worker, cloned_args)
      worker.perform(*cloned_args)
    end

    private

    def thread_identity
      @str ||= Thread.current.object_id.to_s(36)
    end

    def stats(worker, msg, queue)
      # Do not conflate errors from the job with errors caused by updating
      # stats so calling code can react appropriately
      retry_and_suppress_exceptions do
        hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
        Sidekiq.redis do |conn|
          conn.multi do
            conn.hmset("#{identity}:workers", thread_identity, hash)
            conn.expire("#{identity}:workers", 60*60*4)
          end
        end
      end

      begin
        yield
      rescue Exception
        retry_and_suppress_exceptions do
          Sidekiq.redis do |conn|
            failed = "stat:failed:#{Time.now.utc.to_date}"
            result = conn.multi do
              conn.incrby("stat:failed", 1)
              conn.incrby(failed, 1)
            end
            conn.expire(failed, STATS_TIMEOUT) if result.last == 1
          end
        end
        raise
      ensure
        retry_and_suppress_exceptions do
          Sidekiq.redis do |conn|
            processed = "stat:processed:#{Time.now.utc.to_date}"
            result = conn.multi do
              conn.hdel("#{identity}:workers", thread_identity)
              conn.incrby("stat:processed", 1)
              conn.incrby(processed, 1)
            end
            conn.expire(processed, STATS_TIMEOUT) if result.last == 1
          end
        end
      end
    end

    # Deep clone the arguments passed to the worker so that if
    # the message fails, what is pushed back onto Redis hasn't
    # been mutated by the worker.
    def cloned(ary)
      Marshal.load(Marshal.dump(ary))
    end

    # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
    # All exceptions will be swallowed and logged.
    def retry_and_suppress_exceptions(max_retries = 2)
      retry_count = 0
      begin
        yield
      rescue => e
        retry_count += 1
        if retry_count <= max_retries
          Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
          sleep(1)
          retry
        else
          handle_exception(e, { :message => "Exhausted #{max_retries} retries"})
        end
      end
    end
  end
end