1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
|
module Concurrent
# A tool managing concurrency level of tasks.
# The maximum capacity is set in constructor.
# Each acquire will lower the available capacity and release will increase it.
# When there is no available capacity the current thread may either be blocked or
# an event is returned which will be resolved when capacity becomes available.
#
# The more common usage of the Throttle is with a proxy executor
# `a_throttle.on(Concurrent.global_io_executor)`.
# Anything executed on the proxy executor will be throttled and
# execute on the given executor. There can be more than one proxy executors.
# All abstractions which execute tasks have option to specify executor,
# therefore the proxy executor can be injected to any abstraction
# throttling its concurrency level.
#
# {include:file:docs-source/throttle.out.md}
#
# @!macro warn.edge
class Throttle < Synchronization::Object
safe_initialization!
attr_atomic(:capacity)
private :capacity, :capacity=, :swap_capacity, :compare_and_set_capacity, :update_capacity
# @return [Integer] The available capacity.
def available_capacity
current_capacity = capacity
current_capacity >= 0 ? current_capacity : 0
end
# Create throttle.
# @param [Integer] capacity How many tasks using this throttle can run at the same time.
def initialize(capacity)
super()
@MaxCapacity = capacity
@Queue = LockFreeQueue.new
@executor_cache = [nil, nil]
self.capacity = capacity
end
# @return [Integer] The maximum capacity.
def max_capacity
@MaxCapacity
end
# Blocks current thread until there is capacity available in the throttle.
# The acquired capacity has to be returned to the throttle by calling {#release}.
# If block is passed then the block is called after the capacity is acquired and
# it is automatically released after the block is executed.
#
# @param [Numeric] timeout the maximum time in second to wait.
# @yield [] block to execute after the capacity is acquired
# @return [Object, self, true, false]
# * When no timeout and no block it returns self
# * When no timeout and with block it returns the result of the block
# * When with timeout and no block it returns true when acquired and false when timed out
# * When with timeout and with block it returns the result of the block of nil on timing out
# @see #release
def acquire(timeout = nil, &block)
event = acquire_or_event
if event
within_timeout = event.wait(timeout)
# release immediately when acquired later after the timeout since it is unused
event.on_resolution!(self, &:release) unless within_timeout
else
within_timeout = true
end
called = false
if timeout
if block
if within_timeout
called = true
block.call
else
nil
end
else
within_timeout
end
else
if block
called = true
block.call
else
self
end
end
ensure
release if called
end
# Tries to acquire capacity from the throttle.
# Returns true when there is capacity available.
# The acquired capacity has to be returned to the throttle by calling {#release}.
# @return [true, false]
# @see #release
def try_acquire
while true
current_capacity = capacity
if current_capacity > 0
return true if compare_and_set_capacity(
current_capacity, current_capacity - 1)
else
return false
end
end
end
# Releases previously acquired capacity back to Throttle.
# Has to be called exactly once for each acquired capacity.
# @return [self]
# @see #acquire_operation, #acquire, #try_acquire
def release
while true
current_capacity = capacity
if compare_and_set_capacity current_capacity, current_capacity + 1
if current_capacity < 0
# release called after trigger which pushed a trigger, busy wait is ok
Thread.pass until (trigger = @Queue.pop)
trigger.resolve
end
return self
end
end
end
# @return [String] Short string representation.
def to_s
format '%s capacity available %d of %d>', super[0..-2], capacity, @MaxCapacity
end
alias_method :inspect, :to_s
# @!visibility private
def acquire_or_event
while true
current_capacity = capacity
if compare_and_set_capacity current_capacity, current_capacity - 1
if current_capacity > 0
return nil
else
event = Promises.resolvable_event
@Queue.push event
return event
end
end
end
end
include Promises::FactoryMethods
# @param [ExecutorService] executor
# @return [ExecutorService] An executor which wraps given executor and allows to post tasks only
# as available capacity in the throttle allows.
# @example throttling future
# a_future.then_on(a_throttle.on(:io)) { a_throttled_task }
def on(executor = Promises::FactoryMethods.default_executor)
current_executor, current_cache = @executor_cache
return current_cache if current_executor == executor && current_cache
if current_executor.nil?
# cache first proxy
proxy_executor = ProxyExecutor.new(self, Concurrent.executor(executor))
@executor_cache = [executor, proxy_executor]
return proxy_executor
else
# do not cache more than 1 executor
ProxyExecutor.new(self, Concurrent.executor(executor))
end
end
# Uses executor provided by {#on} therefore
# all events and futures created using factory methods on this object will be throttled.
# Overrides {Promises::FactoryMethods#default_executor}.
#
# @return [ExecutorService]
# @see Promises::FactoryMethods#default_executor
def default_executor
on(super)
end
class ProxyExecutor < Synchronization::Object
safe_initialization!
include ExecutorService
def initialize(throttle, executor)
super()
@Throttle = throttle
@Executor = executor
end
def post(*args, &task)
if (event = @Throttle.acquire_or_event)
event.on_resolution! { inner_post(*args, &task) }
else
inner_post(*args, &task)
end
end
def can_overflow?
@Executor.can_overflow?
end
def serialized?
@Executor.serialized?
end
private
def inner_post(*arguments, &task)
@Executor.post(*arguments) do |*args|
begin
task.call(*args)
ensure
@Throttle.release
end
end
end
end
private_constant :ProxyExecutor
end
end
|