1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
|
require 'thread'
require 'concurrent/atomic/event'
require 'concurrent/concern/logging'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/utility/monotonic_time'
module Concurrent
# @!macro thread_pool_executor
# @!macro thread_pool_options
# @!visibility private
class RubyThreadPoolExecutor < RubyExecutorService
# @!macro thread_pool_executor_constant_default_max_pool_size
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
# @!macro thread_pool_executor_constant_default_min_pool_size
DEFAULT_MIN_POOL_SIZE = 0
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0
# @!macro thread_pool_executor_constant_default_thread_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60
# @!macro thread_pool_executor_attr_reader_max_length
attr_reader :max_length
# @!macro thread_pool_executor_attr_reader_min_length
attr_reader :min_length
# @!macro thread_pool_executor_attr_reader_idletime
attr_reader :idletime
# @!macro thread_pool_executor_attr_reader_max_queue
attr_reader :max_queue
# @!macro thread_pool_executor_method_initialize
def initialize(opts = {})
super(opts)
end
# @!macro thread_pool_executor_attr_reader_largest_length
def largest_length
synchronize { @largest_length }
end
# @!macro thread_pool_executor_attr_reader_scheduled_task_count
def scheduled_task_count
synchronize { @scheduled_task_count }
end
# @!macro thread_pool_executor_attr_reader_completed_task_count
def completed_task_count
synchronize { @completed_task_count }
end
# @!macro executor_service_method_can_overflow_question
def can_overflow?
synchronize { ns_limited_queue? }
end
# @!macro thread_pool_executor_attr_reader_length
def length
synchronize { @pool.length }
end
# @!macro thread_pool_executor_attr_reader_queue_length
def queue_length
synchronize { @queue.length }
end
# @!macro thread_pool_executor_attr_reader_remaining_capacity
def remaining_capacity
synchronize do
if ns_limited_queue?
@max_queue - @queue.length
else
-1
end
end
end
# @!visibility private
def remove_busy_worker(worker)
synchronize { ns_remove_busy_worker worker }
end
# @!visibility private
def ready_worker(worker)
synchronize { ns_ready_worker worker }
end
# @!visibility private
def worker_not_old_enough(worker)
synchronize { ns_worker_not_old_enough worker }
end
# @!visibility private
def worker_died(worker)
synchronize { ns_worker_died worker }
end
# @!visibility private
def worker_task_completed
synchronize { @completed_task_count += 1 }
end
private
# @!visibility private
def ns_initialize(opts)
@min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
@max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
@idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@fallback_policy = opts.fetch(:fallback_policy, :abort)
raise ArgumentError.new("#{@fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICIES.include?(@fallback_policy)
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @max_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if @max_length > DEFAULT_MAX_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if @min_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
@pool = [] # all workers
@ready = [] # used as a stash (most idle worker is at the start)
@queue = [] # used as queue
# @ready or @queue is empty at all times
@scheduled_task_count = 0
@completed_task_count = 0
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$ # detects if Ruby has forked
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end
# @!visibility private
def ns_limited_queue?
@max_queue != 0
end
# @!visibility private
def ns_execute(*args, &task)
ns_reset_if_forked
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
else
handle_fallback(*args, &task)
end
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
end
# @!visibility private
def ns_shutdown_execution
ns_reset_if_forked
if @pool.empty?
# nothing to do
stopped_event.set
end
if @queue.empty?
# no more tasks will be accepted, just stop all workers
@pool.each(&:stop)
end
end
# @!visibility private
def ns_kill_execution
# TODO log out unprocessed tasks in queue
# TODO try to shutdown first?
@pool.each(&:kill)
@pool.clear
@ready.clear
end
# tries to assign task to a worker, tries to get one from @ready or to create new one
# @return [true, false] if task is assigned to a worker
#
# @!visibility private
def ns_assign_worker(*args, &task)
# keep growing if the pool is not at the minimum yet
worker = (@ready.pop if @pool.size >= @min_length) || ns_add_busy_worker
if worker
worker << [task, args]
true
else
false
end
rescue ThreadError
# Raised when the operating system refuses to create the new thread
return false
end
# tries to enqueue task
# @return [true, false] if enqueued
#
# @!visibility private
def ns_enqueue(*args, &task)
if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
else
false
end
end
# @!visibility private
def ns_worker_died(worker)
ns_remove_busy_worker worker
replacement_worker = ns_add_busy_worker
ns_ready_worker replacement_worker, false if replacement_worker
end
# creates new worker which has to receive work to do after it's added
# @return [nil, Worker] nil of max capacity is reached
#
# @!visibility private
def ns_add_busy_worker
return if @pool.size >= @max_length
@workers_counter += 1
@pool << (worker = Worker.new(self, @workers_counter))
@largest_length = @pool.length if @pool.length > @largest_length
worker
end
# handle ready worker, giving it new job or assigning back to @ready
#
# @!visibility private
def ns_ready_worker(worker, success = true)
task_and_args = @queue.shift
if task_and_args
worker << task_and_args
else
# stop workers when !running?, do not return them to @ready
if running?
@ready.push(worker)
else
worker.stop
end
end
end
# returns back worker to @ready which was not idle for enough time
#
# @!visibility private
def ns_worker_not_old_enough(worker)
# let's put workers coming from idle_test back to the start (as the oldest worker)
@ready.unshift(worker)
true
end
# removes a worker which is not in not tracked in @ready
#
# @!visibility private
def ns_remove_busy_worker(worker)
@pool.delete(worker)
stopped_event.set if @pool.empty? && !running?
true
end
# try oldest worker if it is idle for enough time, it's returned back at the start
#
# @!visibility private
def ns_prune_pool
return if @pool.size <= @min_length
last_used = @ready.shift
last_used << :idle_test if last_used
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end
def ns_reset_if_forked
if $$ != @ruby_pid
@queue.clear
@ready.clear
@pool.clear
@scheduled_task_count = 0
@completed_task_count = 0
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$
end
end
# @!visibility private
class Worker
include Concern::Logging
def initialize(pool, id)
# instance variables accessed only under pool's lock so no need to sync here again
@queue = Queue.new
@pool = pool
@thread = create_worker @queue, pool, pool.idletime
if @thread.respond_to?(:name=)
@thread.name = [pool.name, 'worker', id].compact.join('-')
end
end
def <<(message)
@queue << message
end
def stop
@queue << :stop
end
def kill
@thread.kill
end
private
def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
last_message = Concurrent.monotonic_time
catch(:stop) do
loop do
case message = my_queue.pop
when :idle_test
if (Concurrent.monotonic_time - last_message) > my_idletime
my_pool.remove_busy_worker(self)
throw :stop
else
my_pool.worker_not_old_enough(self)
end
when :stop
my_pool.remove_busy_worker(self)
throw :stop
else
task, args = message
run_task my_pool, task, args
last_message = Concurrent.monotonic_time
my_pool.ready_worker(self)
end
end
end
end
end
def run_task(pool, task, args)
task.call(*args)
pool.worker_task_completed
rescue => ex
# let it fail
log DEBUG, ex
rescue Exception => ex
log ERROR, ex
pool.worker_died(self)
throw :stop
end
end
private_constant :Worker
end
end
|