File: java_thread_pool_executor.rb

package info (click to toggle)
ruby-concurrent 1.3.6-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,152 kB
  • sloc: ruby: 30,953; java: 6,128; ansic: 293; makefile: 26; sh: 19
file content (147 lines) | stat: -rw-r--r-- 4,990 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
if Concurrent.on_jruby?

  require 'concurrent/executor/java_executor_service'

  module Concurrent

    # @!macro thread_pool_executor
    # @!macro thread_pool_options
    # @!visibility private
    class JavaThreadPoolExecutor < JavaExecutorService
      include Concern::Deprecation

      # @!macro thread_pool_executor_constant_default_max_pool_size
      DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647

      # @!macro thread_pool_executor_constant_default_min_pool_size
      DEFAULT_MIN_POOL_SIZE = 0

      # @!macro thread_pool_executor_constant_default_max_queue_size
      DEFAULT_MAX_QUEUE_SIZE = 0

      # @!macro thread_pool_executor_constant_default_thread_timeout
      DEFAULT_THREAD_IDLETIMEOUT = 60

      # @!macro thread_pool_executor_constant_default_synchronous
      DEFAULT_SYNCHRONOUS = false

      # @!macro thread_pool_executor_attr_reader_max_length
      attr_reader :max_length

      # @!macro thread_pool_executor_attr_reader_max_queue
      attr_reader :max_queue

      # @!macro thread_pool_executor_attr_reader_synchronous
      attr_reader :synchronous

      # @!macro thread_pool_executor_method_initialize
      def initialize(opts = {})
        super(opts)
      end

      # @!macro executor_service_method_can_overflow_question
      def can_overflow?
        @max_queue != 0
      end

      # @!macro thread_pool_executor_attr_reader_min_length
      def min_length
        @executor.getCorePoolSize
      end

      # @!macro thread_pool_executor_attr_reader_max_length
      def max_length
        @executor.getMaximumPoolSize
      end

      # @!macro thread_pool_executor_attr_reader_length
      def length
        @executor.getPoolSize
      end

      # @!macro thread_pool_executor_attr_reader_largest_length
      def largest_length
        @executor.getLargestPoolSize
      end

      # @!macro thread_pool_executor_attr_reader_scheduled_task_count
      def scheduled_task_count
        @executor.getTaskCount
      end

      # @!macro thread_pool_executor_attr_reader_completed_task_count
      def completed_task_count
        @executor.getCompletedTaskCount
      end

      # @!macro thread_pool_executor_method_active_count
      def active_count
        @executor.getActiveCount
      end

      # @!macro thread_pool_executor_attr_reader_idletime
      def idletime
        @executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
      end

      # @!macro thread_pool_executor_attr_reader_queue_length
      def queue_length
        @executor.getQueue.size
      end

      # @!macro thread_pool_executor_attr_reader_remaining_capacity
      def remaining_capacity
        @max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
      end

      # @!macro executor_service_method_running_question
      def running?
        super && !@executor.isTerminating
      end

      # @!macro thread_pool_executor_method_prune_pool
      def prune_pool
        deprecated "#prune_pool has no effect and will be removed in the next release."
      end

      private

      def ns_initialize(opts)
        min_length       = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
        max_length       = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
        idletime         = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
        @max_queue       = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
        @synchronous     = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
        @fallback_policy = opts.fetch(:fallback_policy, :abort)

        raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
        raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
        raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
        raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
        raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
        raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)

        if @max_queue == 0
          if @synchronous
            queue = java.util.concurrent.SynchronousQueue.new
          else
            queue = java.util.concurrent.LinkedBlockingQueue.new
          end
        else
          queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
        end

        @executor = java.util.concurrent.ThreadPoolExecutor.new(
            min_length,
            max_length,
            idletime,
            java.util.concurrent.TimeUnit::SECONDS,
            queue,
            DaemonThreadFactory.new(ns_auto_terminate?),
            FALLBACK_POLICY_CLASSES[@fallback_policy].new)

      end
    end

  end
end