1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
module Concurrent
# @!macro [new] throttle.example.throttled_block
# @example
# max_two = Throttle.new 2
# 10.times.map do
# Thread.new do
# max_two.throttled_block do
# # Only 2 at the same time
# do_stuff
# end
# end
# end
# @!macro [new] throttle.example.throttled_future
# @example
# throttle.throttled_future(1) do |arg|
# arg.succ
# end
# @!macro [new] throttle.example.throttled_future_chain
# @example
# throttle.throttled_future_chain do |trigger|
# trigger.
# # 2 throttled promises
# chain { 1 }.
# then(&:succ)
# end
# @!macro [new] throttle.example.then_throttled_by
# @example
# data = (1..5).to_a
# db = data.reduce({}) { |h, v| h.update v => v.to_s }
# max_two = Throttle.new 2
#
# futures = data.map do |data|
# Promises.future(data) do |data|
# # un-throttled, concurrency level equal data.size
# data + 1
# end.then_throttled_by(max_two, db) do |v, db|
# # throttled, only 2 tasks executed at the same time
# # e.g. limiting access to db
# db[v]
# end
# end
#
# futures.map(&:value!) # => [2, 3, 4, 5, nil]
# A tool manage concurrency level of future tasks.
#
# @!macro throttle.example.then_throttled_by
# @!macro throttle.example.throttled_future
# @!macro throttle.example.throttled_future_chain
# @!macro throttle.example.throttled_block
class Throttle < Synchronization::Object
# TODO (pitr-ch 21-Dec-2016): consider using sized channel for implementation instead when available
safe_initialization!
private *attr_atomic(:can_run)
# New throttle.
# @param [Integer] limit
def initialize(limit)
super()
@Limit = limit
self.can_run = limit
@Queue = LockFreeQueue.new
end
# @return [Integer] The limit.
def limit
@Limit
end
# New event which will be resolved when depending tasks can execute.
# Has to be used and after the critical work is done {#release} must be called exactly once.
# @return [Promises::Event]
# @see #release
def trigger
while true
current_can_run = can_run
if compare_and_set_can_run current_can_run, current_can_run - 1
if current_can_run > 0
return Promises.resolved_event
else
event = Promises.resolvable_event
@Queue.push event
return event
end
end
end
end
# Has to be called once for each trigger after it is ok to execute another throttled task.
# @return [self]
# @see #trigger
def release
while true
current_can_run = can_run
if compare_and_set_can_run current_can_run, current_can_run + 1
if current_can_run < 0
Thread.pass until (trigger = @Queue.pop)
trigger.resolve
end
return self
end
end
end
# Blocks current thread until the block can be executed.
# @yield to throttled block
# @yieldreturn [Object] is used as a result of the method
# @return [Object] the result of the block
# @!macro throttle.example.throttled_block
def throttled_block(&block)
trigger.wait
block.call
ensure
release
end
# @return [String] Short string representation.
def to_s
format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
end
alias_method :inspect, :to_s
module PromisesIntegration
# Allows to throttle a chain of promises.
# @yield [trigger] a trigger which has to be used to build up a chain of promises, the last one is result
# of the block. When the last one resolves, {Throttle#release} is called on the throttle.
# @yieldparam [Promises::Event, Promises::Future] trigger
# @yieldreturn [Promises::Event, Promises::Future] The final future of the throttled chain.
# @return [Promises::Event, Promises::Future] The final future of the throttled chain.
# @!macro throttle.example.throttled_future_chain
def throttled_future_chain(&throttled_futures)
throttled_futures.call(trigger).on_resolution! { release }
end
# Behaves as {Promises::FactoryMethods#future} but the future is throttled.
# @return [Promises::Future]
# @see Promises::FactoryMethods#future
# @!macro throttle.example.throttled_future
def throttled_future(*args, &task)
trigger.chain(*args, &task).on_resolution! { release }
end
end
include PromisesIntegration
end
module Promises
class AbstractEventFuture < Synchronization::Object
module ThrottleIntegration
def throttled_by(throttle, &throttled_futures)
a_trigger = self & self.chain { throttle.trigger }.flat_event
throttled_futures.call(a_trigger).on_resolution! { throttle.release }
end
# Behaves as {Promises::AbstractEventFuture#chain} but the it is throttled.
# @return [Promises::Future, Promises::Event]
# @see Promises::AbstractEventFuture#chain
def chain_throttled_by(throttle, *args, &block)
throttled_by(throttle) { |trigger| trigger.chain(*args, &block) }
end
end
include ThrottleIntegration
end
class Future < AbstractEventFuture
module ThrottleIntegration
# Behaves as {Promises::Future#then} but the it is throttled.
# @return [Promises::Future]
# @see Promises::Future#then
# @!macro throttle.example.then_throttled_by
def then_throttled_by(throttle, *args, &block)
throttled_by(throttle) { |trigger| trigger.then(*args, &block) }
end
# Behaves as {Promises::Future#rescue} but the it is throttled.
# @return [Promises::Future]
# @see Promises::Future#rescue
def rescue_throttled_by(throttle, *args, &block)
throttled_by(throttle) { |trigger| trigger.rescue(*args, &block) }
end
end
include ThrottleIntegration
end
end
end
|