File: simple_executor_service.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (100 lines) | stat: -rw-r--r-- 2,642 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
require 'concurrent/atomics'
require 'concurrent/executor/executor_service'

module Concurrent

  # An executor service in which every operation spawns a new,
  # independently operating thread.
  #
  # This is perhaps the most inefficient executor service in this
  # library. It exists mainly for testing an debugging. Thread creation
  # and management is expensive in Ruby and this executor performs no
  # resource pooling. This can be very beneficial during testing and
  # debugging because it decouples the using code from the underlying
  # executor implementation. In production this executor will likely
  # lead to suboptimal performance.
  #
  # @note Intended for use primarily in testing and debugging.
  class SimpleExecutorService < RubyExecutorService

    # @!macro executor_service_method_post
    def self.post(*args)
      raise ArgumentError.new('no block given') unless block_given?
      Thread.new(*args) do
        Thread.current.abort_on_exception = false
        yield(*args)
      end
      true
    end

    # @!macro executor_service_method_left_shift
    def self.<<(task)
      post(&task)
      self
    end

    # @!macro executor_service_method_post
    def post(*args, &task)
      raise ArgumentError.new('no block given') unless block_given?
      return false unless running?
      @count.increment
      Thread.new(*args) do
        Thread.current.abort_on_exception = false
        begin
          yield(*args)
        ensure
          @count.decrement
          @stopped.set if @running.false? && @count.value == 0
        end
      end
    end

    # @!macro executor_service_method_left_shift
    def <<(task)
      post(&task)
      self
    end

    # @!macro executor_service_method_running_question
    def running?
      @running.true?
    end

    # @!macro executor_service_method_shuttingdown_question
    def shuttingdown?
      @running.false? && ! @stopped.set?
    end

    # @!macro executor_service_method_shutdown_question
    def shutdown?
      @stopped.set?
    end

    # @!macro executor_service_method_shutdown
    def shutdown
      @running.make_false
      @stopped.set if @count.value == 0
      true
    end

    # @!macro executor_service_method_kill
    def kill
      @running.make_false
      @stopped.set
      true
    end

    # @!macro executor_service_method_wait_for_termination
    def wait_for_termination(timeout = nil)
      @stopped.wait(timeout)
    end

    private

    def ns_initialize(*args)
      @running = Concurrent::AtomicBoolean.new(true)
      @stopped = Concurrent::Event.new
      @count = Concurrent::AtomicFixnum.new(0)
    end
  end
end