1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization/lockable_object'
module Concurrent
# @!macro abstract_executor_service_public_api
# @!visibility private
class AbstractExecutorService < Synchronization::LockableObject
include ExecutorService
include Concern::Deprecation
# The set of possible fallback policies that may be set at thread pool creation.
FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze
# @!macro executor_service_attr_reader_fallback_policy
attr_reader :fallback_policy
attr_reader :name
# Create a new thread pool.
def initialize(opts = {}, &block)
super(&nil)
synchronize do
@auto_terminate = opts.fetch(:auto_terminate, true)
@name = opts.fetch(:name) if opts.key?(:name)
ns_initialize(opts, &block)
end
end
def to_s
name ? "#{super[0..-2]} name: #{name}>" : super
end
# @!macro executor_service_method_shutdown
def shutdown
raise NotImplementedError
end
# @!macro executor_service_method_kill
def kill
raise NotImplementedError
end
# @!macro executor_service_method_wait_for_termination
def wait_for_termination(timeout = nil)
raise NotImplementedError
end
# @!macro executor_service_method_running_question
def running?
synchronize { ns_running? }
end
# @!macro executor_service_method_shuttingdown_question
def shuttingdown?
synchronize { ns_shuttingdown? }
end
# @!macro executor_service_method_shutdown_question
def shutdown?
synchronize { ns_shutdown? }
end
# @!macro executor_service_method_auto_terminate_question
def auto_terminate?
synchronize { @auto_terminate }
end
# @!macro executor_service_method_auto_terminate_setter
def auto_terminate=(value)
deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
end
private
# Returns an action which executes the `fallback_policy` once the queue
# size reaches `max_queue`. The reason for the indirection of an action
# is so that the work can be deferred outside of synchronization.
#
# @param [Array] args the arguments to the task which is being handled.
#
# @!visibility private
def fallback_action(*args)
case fallback_policy
when :abort
lambda { raise RejectedExecutionError }
when :discard
lambda { false }
when :caller_runs
lambda {
begin
yield(*args)
rescue => ex
# let it fail
log DEBUG, ex
end
true
}
else
lambda { fail "Unknown fallback policy #{fallback_policy}" }
end
end
def ns_execute(*args, &task)
raise NotImplementedError
end
# @!macro executor_service_method_ns_shutdown_execution
#
# Callback method called when an orderly shutdown has completed.
# The default behavior is to signal all waiting threads.
def ns_shutdown_execution
# do nothing
end
# @!macro executor_service_method_ns_kill_execution
#
# Callback method called when the executor has been killed.
# The default behavior is to do nothing.
def ns_kill_execution
# do nothing
end
def ns_auto_terminate?
@auto_terminate
end
end
end
|