File: chain.rb

package info (click to toggle)
ruby-sidekiq 7.3.2%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 956 kB
  • sloc: ruby: 6,094; javascript: 526; makefile: 21; sh: 20
file content (207 lines) | stat: -rw-r--r-- 5,893 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# frozen_string_literal: true

require "sidekiq/middleware/modules"

module Sidekiq
  # Middleware is code configured to run before/after
  # a job is processed.  It is patterned after Rack
  # middleware. Middleware exists for the client side
  # (pushing jobs onto the queue) as well as the server
  # side (when jobs are actually processed).
  #
  # Callers will register middleware Classes and Sidekiq will
  # create new instances of the middleware for every job. This
  # is important so that instance state is not shared accidentally
  # between job executions.
  #
  # To add middleware for the client:
  #
  #   Sidekiq.configure_client do |config|
  #     config.client_middleware do |chain|
  #       chain.add MyClientHook
  #     end
  #   end
  #
  # To modify middleware for the server, just call
  # with another block:
  #
  #   Sidekiq.configure_server do |config|
  #     config.server_middleware do |chain|
  #       chain.add MyServerHook
  #       chain.remove ActiveRecord
  #     end
  #   end
  #
  # To insert immediately preceding another entry:
  #
  #   Sidekiq.configure_client do |config|
  #     config.client_middleware do |chain|
  #       chain.insert_before ActiveRecord, MyClientHook
  #     end
  #   end
  #
  # To insert immediately after another entry:
  #
  #   Sidekiq.configure_client do |config|
  #     config.client_middleware do |chain|
  #       chain.insert_after ActiveRecord, MyClientHook
  #     end
  #   end
  #
  # This is an example of a minimal server middleware:
  #
  #   class MyServerHook
  #     include Sidekiq::ServerMiddleware
  #
  #     def call(job_instance, msg, queue)
  #       logger.info "Before job"
  #       redis {|conn| conn.get("foo") } # do something in Redis
  #       yield
  #       logger.info "After job"
  #     end
  #   end
  #
  # This is an example of a minimal client middleware, note
  # the method must return the result or the job will not push
  # to Redis:
  #
  #   class MyClientHook
  #     include Sidekiq::ClientMiddleware
  #
  #     def call(job_class, msg, queue, redis_pool)
  #       logger.info "Before push"
  #       result = yield
  #       logger.info "After push"
  #       result
  #     end
  #   end
  #
  module Middleware
    class Chain
      include Enumerable

      # Iterate through each middleware in the chain
      def each(&block)
        entries.each(&block)
      end

      # @api private
      def initialize(config = nil) # :nodoc:
        @config = config
        @entries = nil
        yield self if block_given?
      end

      def entries
        @entries ||= []
      end

      def copy_for(capsule)
        chain = Sidekiq::Middleware::Chain.new(capsule)
        chain.instance_variable_set(:@entries, entries.dup)
        chain
      end

      # Remove all middleware matching the given Class
      # @param klass [Class]
      def remove(klass)
        entries.delete_if { |entry| entry.klass == klass }
      end

      # Add the given middleware to the end of the chain.
      # Sidekiq will call `klass.new(*args)` to create a clean
      # copy of your middleware for every job executed.
      #
      #   chain.add(Statsd::Metrics, { collector: "localhost:8125" })
      #
      # @param klass [Class] Your middleware class
      # @param *args [Array<Object>] Set of arguments to pass to every instance of your middleware
      def add(klass, *args)
        remove(klass)
        entries << Entry.new(@config, klass, *args)
      end

      # Identical to {#add} except the middleware is added to the front of the chain.
      def prepend(klass, *args)
        remove(klass)
        entries.insert(0, Entry.new(@config, klass, *args))
      end

      # Inserts +newklass+ before +oldklass+ in the chain.
      # Useful if one middleware must run before another middleware.
      def insert_before(oldklass, newklass, *args)
        i = entries.index { |entry| entry.klass == newklass }
        new_entry = i.nil? ? Entry.new(@config, newklass, *args) : entries.delete_at(i)
        i = entries.index { |entry| entry.klass == oldklass } || 0
        entries.insert(i, new_entry)
      end

      # Inserts +newklass+ after +oldklass+ in the chain.
      # Useful if one middleware must run after another middleware.
      def insert_after(oldklass, newklass, *args)
        i = entries.index { |entry| entry.klass == newklass }
        new_entry = i.nil? ? Entry.new(@config, newklass, *args) : entries.delete_at(i)
        i = entries.index { |entry| entry.klass == oldklass } || entries.count - 1
        entries.insert(i + 1, new_entry)
      end

      # @return [Boolean] if the given class is already in the chain
      def exists?(klass)
        any? { |entry| entry.klass == klass }
      end
      alias_method :include?, :exists?

      # @return [Boolean] if the chain contains no middleware
      def empty?
        @entries.nil? || @entries.empty?
      end

      def retrieve
        map(&:make_new)
      end

      def clear
        entries.clear
      end

      # Used by Sidekiq to execute the middleware at runtime
      # @api private
      def invoke(*args, &block)
        return yield if empty?

        chain = retrieve
        traverse(chain, 0, args, &block)
      end

      private

      def traverse(chain, index, args, &block)
        if index >= chain.size
          yield
        else
          chain[index].call(*args) do
            traverse(chain, index + 1, args, &block)
          end
        end
      end
    end

    # Represents each link in the middleware chain
    # @api private
    class Entry
      attr_reader :klass

      def initialize(config, klass, *args)
        @config = config
        @klass = klass
        @args = args
      end

      def make_new
        x = @klass.new(*@args)
        x.config = @config if @config && x.respond_to?(:config=)
        x
      end
    end
  end
end