File: throttle.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (225 lines) | stat: -rw-r--r-- 7,050 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
module Concurrent
  # A tool managing concurrency level of tasks.
  # The maximum capacity is set in constructor.
  # Each acquire will lower the available capacity and release will increase it.
  # When there is no available capacity the current thread may either be blocked or
  # an event is returned which will be resolved when capacity becomes available.
  #
  # The more common usage of the Throttle is with a proxy executor
  # `a_throttle.on(Concurrent.global_io_executor)`.
  # Anything executed on the proxy executor will be throttled and
  # execute on the given executor. There can be more than one proxy executors.
  # All abstractions which execute tasks have option to specify executor,
  # therefore the proxy executor can be injected to any abstraction
  # throttling its concurrency level.
  #
  # {include:file:docs-source/throttle.out.md}
  #
  # @!macro warn.edge
  class Throttle < Synchronization::Object
    safe_initialization!

    attr_atomic(:capacity)
    private :capacity, :capacity=, :swap_capacity, :compare_and_set_capacity, :update_capacity

    # @return [Integer] The available capacity.
    def available_capacity
      current_capacity = capacity
      current_capacity >= 0 ? current_capacity : 0
    end

    # Create throttle.
    # @param [Integer] capacity How many tasks using this throttle can run at the same time.
    def initialize(capacity)
      super()
      @MaxCapacity            = capacity
      @Queue                  = LockFreeQueue.new
      @executor_cache         = [nil, nil]
      self.capacity = capacity
    end

    # @return [Integer] The maximum capacity.
    def max_capacity
      @MaxCapacity
    end

    # Blocks current thread until there is capacity available in the throttle.
    # The acquired capacity has to be returned to the throttle by calling {#release}.
    # If block is passed then the block is called after the capacity is acquired and
    # it is automatically released after the block is executed.
    #
    # @param [Numeric] timeout the maximum time in second to wait.
    # @yield [] block to execute after the capacity is acquired
    # @return [Object, self, true, false]
    #   * When no timeout and no block it returns self
    #   * When no timeout and with block it returns the result of the block
    #   * When with timeout and no block it returns true when acquired and false when timed out
    #   * When with timeout and with block it returns the result of the block of nil on timing out
    # @see #release
    def acquire(timeout = nil, &block)
      event = acquire_or_event
      if event
        within_timeout = event.wait(timeout)
        # release immediately when acquired later after the timeout since it is unused
        event.on_resolution!(self, &:release) unless within_timeout
      else
        within_timeout = true
      end

      called = false
      if timeout
        if block
          if within_timeout
            called = true
            block.call
          else
            nil
          end
        else
          within_timeout
        end
      else
        if block
          called = true
          block.call
        else
          self
        end
      end
    ensure
      release if called
    end

    # Tries to acquire capacity from the throttle.
    # Returns true when there is capacity available.
    # The acquired capacity has to be returned to the throttle by calling {#release}.
    # @return [true, false]
    # @see #release
    def try_acquire
      while true
        current_capacity = capacity
        if current_capacity > 0
          return true if compare_and_set_capacity(
              current_capacity, current_capacity - 1)
        else
          return false
        end
      end
    end

    # Releases previously acquired capacity back to Throttle.
    # Has to be called exactly once for each acquired capacity.
    # @return [self]
    # @see #acquire_operation, #acquire, #try_acquire
    def release
      while true
        current_capacity = capacity
        if compare_and_set_capacity current_capacity, current_capacity + 1
          if current_capacity < 0
            # release called after trigger which pushed a trigger, busy wait is ok
            Thread.pass until (trigger = @Queue.pop)
            trigger.resolve
          end
          return self
        end
      end
    end

    # @return [String] Short string representation.
    def to_s
      format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
    end

    alias_method :inspect, :to_s

    # @!visibility private
    def acquire_or_event
      while true
        current_capacity = capacity
        if compare_and_set_capacity current_capacity, current_capacity - 1
          if current_capacity > 0
            return nil
          else
            event = Promises.resolvable_event
            @Queue.push event
            return event
          end
        end
      end
    end

    include Promises::FactoryMethods

    # @param [ExecutorService] executor
    # @return [ExecutorService] An executor which wraps given executor and allows to post tasks only
    #   as available capacity in the throttle allows.
    # @example throttling future
    #   a_future.then_on(a_throttle.on(:io)) { a_throttled_task }
    def on(executor = Promises::FactoryMethods.default_executor)
      current_executor, current_cache = @executor_cache
      return current_cache if current_executor == executor && current_cache

      if current_executor.nil?
        # cache first proxy
        proxy_executor  = ProxyExecutor.new(self, Concurrent.executor(executor))
        @executor_cache = [executor, proxy_executor]
        return proxy_executor
      else
        # do not cache more than 1 executor
        ProxyExecutor.new(self, Concurrent.executor(executor))
      end
    end

    # Uses executor provided by {#on} therefore
    # all events and futures created using factory methods on this object will be throttled.
    # Overrides {Promises::FactoryMethods#default_executor}.
    #
    # @return [ExecutorService]
    # @see Promises::FactoryMethods#default_executor
    def default_executor
      on(super)
    end

    class ProxyExecutor < Synchronization::Object
      safe_initialization!

      include ExecutorService

      def initialize(throttle, executor)
        super()
        @Throttle = throttle
        @Executor = executor
      end

      def post(*args, &task)
        if (event = @Throttle.acquire_or_event)
          event.on_resolution! { inner_post(*args, &task) }
        else
          inner_post(*args, &task)
        end
      end

      def can_overflow?
        @Executor.can_overflow?
      end

      def serialized?
        @Executor.serialized?
      end

      private

      def inner_post(*arguments, &task)
        @Executor.post(*arguments) do |*args|
          begin
            task.call(*args)
          ensure
            @Throttle.release
          end
        end
      end
    end

    private_constant :ProxyExecutor
  end
end