1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
if Concurrent.on_jruby?
require 'concurrent/executor/java_executor_service'
module Concurrent
# @!macro thread_pool_executor
# @!macro thread_pool_options
# @!visibility private
class JavaThreadPoolExecutor < JavaExecutorService
include Concern::Deprecation
# @!macro thread_pool_executor_constant_default_max_pool_size
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
# @!macro thread_pool_executor_constant_default_min_pool_size
DEFAULT_MIN_POOL_SIZE = 0
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0
# @!macro thread_pool_executor_constant_default_thread_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60
# @!macro thread_pool_executor_constant_default_synchronous
DEFAULT_SYNCHRONOUS = false
# @!macro thread_pool_executor_attr_reader_max_length
attr_reader :max_length
# @!macro thread_pool_executor_attr_reader_max_queue
attr_reader :max_queue
# @!macro thread_pool_executor_attr_reader_synchronous
attr_reader :synchronous
# @!macro thread_pool_executor_method_initialize
def initialize(opts = {})
super(opts)
end
# @!macro executor_service_method_can_overflow_question
def can_overflow?
@max_queue != 0
end
# @!macro thread_pool_executor_attr_reader_min_length
def min_length
@executor.getCorePoolSize
end
# @!macro thread_pool_executor_attr_reader_max_length
def max_length
@executor.getMaximumPoolSize
end
# @!macro thread_pool_executor_attr_reader_length
def length
@executor.getPoolSize
end
# @!macro thread_pool_executor_attr_reader_largest_length
def largest_length
@executor.getLargestPoolSize
end
# @!macro thread_pool_executor_attr_reader_scheduled_task_count
def scheduled_task_count
@executor.getTaskCount
end
# @!macro thread_pool_executor_attr_reader_completed_task_count
def completed_task_count
@executor.getCompletedTaskCount
end
# @!macro thread_pool_executor_method_active_count
def active_count
@executor.getActiveCount
end
# @!macro thread_pool_executor_attr_reader_idletime
def idletime
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
end
# @!macro thread_pool_executor_attr_reader_queue_length
def queue_length
@executor.getQueue.size
end
# @!macro thread_pool_executor_attr_reader_remaining_capacity
def remaining_capacity
@max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
end
# @!macro executor_service_method_running_question
def running?
super && !@executor.isTerminating
end
# @!macro thread_pool_executor_method_prune_pool
def prune_pool
deprecated "#prune_pool has no effect and will be removed in the next release."
end
private
def ns_initialize(opts)
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
@fallback_policy = opts.fetch(:fallback_policy, :abort)
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
if @max_queue == 0
if @synchronous
queue = java.util.concurrent.SynchronousQueue.new
else
queue = java.util.concurrent.LinkedBlockingQueue.new
end
else
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
end
@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length,
max_length,
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
DaemonThreadFactory.new(ns_auto_terminate?),
FALLBACK_POLICY_CLASSES[@fallback_policy].new)
end
end
end
end
|