File: interrupted_set.rb

package info (click to toggle)
ruby-gitlab-sidekiq-fetcher 0.9.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 204 kB
  • sloc: ruby: 760; makefile: 3
file content (51 lines) | stat: -rw-r--r-- 1,157 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
require 'sidekiq/api'

module Sidekiq
  class InterruptedSet < ::Sidekiq::JobSet
    DEFAULT_MAX_CAPACITY = 10_000
    DEFAULT_MAX_TIMEOUT = 90 * 24 * 60 * 60 # 3 months

    def initialize
      super "interrupted"
    end

    def put(message, opts = {})
      now = Time.now.to_f

      with_multi_connection(opts[:connection]) do |conn|
        conn.zadd(name, now.to_s, message)
        conn.zremrangebyscore(name, '-inf', now - self.class.timeout)
        conn.zremrangebyrank(name, 0, - self.class.max_jobs)
      end

      true
    end

    # Yield block inside an existing multi connection or creates new one
    def with_multi_connection(conn, &block)
      return yield(conn) if conn

      Sidekiq.redis do |c|
        c.multi do |multi|
          yield(multi)
        end
      end
    end

    def retry_all
      each(&:retry) while size > 0
    end

    def self.max_jobs
      options[:interrupted_max_jobs] || DEFAULT_MAX_CAPACITY
    end

    def self.timeout
      options[:interrupted_timeout_in_seconds] || DEFAULT_MAX_TIMEOUT
    end

    def self.options
      Sidekiq.respond_to?(:[]) ? Sidekiq : Sidekiq.options
    end
  end
end