File: abstract_executor_service.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (128 lines) | stat: -rw-r--r-- 3,297 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require 'concurrent/errors'
require 'concurrent/concern/deprecation'
require 'concurrent/executor/executor_service'
require 'concurrent/synchronization'

module Concurrent

  # @!macro abstract_executor_service_public_api
  # @!visibility private
  class AbstractExecutorService < Synchronization::LockableObject
    include ExecutorService
    include Concern::Deprecation

    # The set of possible fallback policies that may be set at thread pool creation.
    FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze

    # @!macro executor_service_attr_reader_fallback_policy
    attr_reader :fallback_policy

    attr_reader :name

    # Create a new thread pool.
    def initialize(opts = {}, &block)
      super(&nil)
      synchronize do
        @auto_terminate = opts.fetch(:auto_terminate, true)
        @name = opts.fetch(:name) if opts.key?(:name)
        ns_initialize(opts, &block)
      end
    end

    def to_s
      name ? "#{super[0..-2]} name: #{name}>" : super
    end

    # @!macro executor_service_method_shutdown
    def shutdown
      raise NotImplementedError
    end

    # @!macro executor_service_method_kill
    def kill
      raise NotImplementedError
    end

    # @!macro executor_service_method_wait_for_termination
    def wait_for_termination(timeout = nil)
      raise NotImplementedError
    end

    # @!macro executor_service_method_running_question
    def running?
      synchronize { ns_running? }
    end

    # @!macro executor_service_method_shuttingdown_question
    def shuttingdown?
      synchronize { ns_shuttingdown? }
    end

    # @!macro executor_service_method_shutdown_question
    def shutdown?
      synchronize { ns_shutdown? }
    end

    # @!macro executor_service_method_auto_terminate_question
    def auto_terminate?
      synchronize { @auto_terminate }
    end

    # @!macro executor_service_method_auto_terminate_setter
    def auto_terminate=(value)
      deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized."
    end

    private

    # Handler which executes the `fallback_policy` once the queue size
    # reaches `max_queue`.
    #
    # @param [Array] args the arguments to the task which is being handled.
    #
    # @!visibility private
    def handle_fallback(*args)
      case fallback_policy
      when :abort
        raise RejectedExecutionError
      when :discard
        false
      when :caller_runs
        begin
          yield(*args)
        rescue => ex
          # let it fail
          log DEBUG, ex
        end
        true
      else
        fail "Unknown fallback policy #{fallback_policy}"
      end
    end

    def ns_execute(*args, &task)
      raise NotImplementedError
    end

    # @!macro executor_service_method_ns_shutdown_execution
    #
    #   Callback method called when an orderly shutdown has completed.
    #   The default behavior is to signal all waiting threads.
    def ns_shutdown_execution
      # do nothing
    end

    # @!macro executor_service_method_ns_kill_execution
    #
    #   Callback method called when the executor has been killed.
    #   The default behavior is to do nothing.
    def ns_kill_execution
      # do nothing
    end

    def ns_auto_terminate?
      @auto_terminate
    end

  end
end