1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
require 'concurrent/errors'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'
require 'concurrent/utility/at_exit'
module Concurrent
# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
# @!macro executor_service_attr_reader_fallback_policy
attr_reader :fallback_policy
# Create a new thread pool.
def initialize(*args, &block)
super(&nil)
synchronize { ns_initialize(*args, &block) }
end
# @!macro executor_service_method_shutdown
def shutdown
raise NotImplementedError
end
# @!macro executor_service_method_kill
def kill
raise NotImplementedError
end
# @!macro executor_service_method_wait_for_termination
def wait_for_termination(timeout = nil)
raise NotImplementedError
end
# @!macro executor_service_method_running_question
def running?
synchronize { ns_running? }
end
# @!macro executor_service_method_shuttingdown_question
def shuttingdown?
synchronize { ns_shuttingdown? }
end
# @!macro executor_service_method_shutdown_question
def shutdown?
synchronize { ns_shutdown? }
end
# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { ns_auto_terminate? }
end
# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
synchronize { self.ns_auto_terminate = value }
end
private
# Handler which executes the `fallback_policy` once the queue size
# reaches `max_queue`.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def handle_fallback(*args)
case fallback_policy
when :abort
raise RejectedExecutionError
when :discard
false
when :caller_runs
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
else
fail "Unknown fallback policy #{fallback_policy}"
end
end
def ns_execute(*args, &task)
raise NotImplementedError
end
# @!macro [attach] executor_service_method_ns_shutdown_execution
#
# Callback method called when an orderly shutdown has completed.
# The default behavior is to signal all waiting threads.
def ns_shutdown_execution
# do nothing
end
# @!macro [attach] executor_service_method_ns_kill_execution
#
# Callback method called when the executor has been killed.
# The default behavior is to do nothing.
def ns_kill_execution
# do nothing
end
def ns_auto_terminate?
!!@auto_terminate
end
def ns_auto_terminate=(value)
case value
when true
AtExit.add(self) { terminate_at_exit }
@auto_terminate = true
when false
AtExit.delete(self)
@auto_terminate = false
else
raise ArgumentError
end
end
def terminate_at_exit
kill # TODO be gentle first
wait_for_termination(10)
end
end
end
|