1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'
module Concurrent
# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
include Concern::Deprecation
# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
# @!macro executor_service_attr_reader_fallback_policy
attr_reader :fallback_policy
attr_reader :name
# Create a new thread pool.
def initialize(opts = {}, &block)
super(&nil)
synchronize do
@auto_terminate = opts.fetch(:auto_terminate, true)
@name = opts.fetch(:name) if opts.key?(:name)
ns_initialize(opts, &block)
end
end
def to_s
name ? "#{super[0..-2]} name: #{name}>" : super
end
# @!macro executor_service_method_shutdown
def shutdown
raise NotImplementedError
end
# @!macro executor_service_method_kill
def kill
raise NotImplementedError
end
# @!macro executor_service_method_wait_for_termination
def wait_for_termination(timeout = nil)
raise NotImplementedError
end
# @!macro executor_service_method_running_question
def running?
synchronize { ns_running? }
end
# @!macro executor_service_method_shuttingdown_question
def shuttingdown?
synchronize { ns_shuttingdown? }
end
# @!macro executor_service_method_shutdown_question
def shutdown?
synchronize { ns_shutdown? }
end
# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { @auto_terminate }
end
# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
end
private
# Handler which executes the `fallback_policy` once the queue size
# reaches `max_queue`.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def handle_fallback(*args)
case fallback_policy
when :abort
raise RejectedExecutionError
when :discard
false
when :caller_runs
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
else
fail "Unknown fallback policy #{fallback_policy}"
end
end
def ns_execute(*args, &task)
raise NotImplementedError
end
# @!macro executor_service_method_ns_shutdown_execution
#
# Callback method called when an orderly shutdown has completed.
# The default behavior is to signal all waiting threads.
def ns_shutdown_execution
# do nothing
end
# @!macro executor_service_method_ns_kill_execution
#
# Callback method called when the executor has been killed.
# The default behavior is to do nothing.
def ns_kill_execution
# do nothing
end
def ns_auto_terminate?
@auto_terminate
end
end
end
|