File: future.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (141 lines) | stat: -rw-r--r-- 4,289 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
require 'thread'
require 'concurrent/constants'
require 'concurrent/errors'
require 'concurrent/ivar'
require 'concurrent/executor/safe_task_executor'

require 'concurrent/options'

# TODO (pitr-ch 14-Mar-2017): deprecate, Future, Promise, etc.


module Concurrent

  # {include:file:docs-source/future.md}
  #
  # @!macro copy_options
  #
  # @see http://ruby-doc.org/stdlib-2.1.1/libdoc/observer/rdoc/Observable.html Ruby Observable module
  # @see http://clojuredocs.org/clojure_core/clojure.core/future Clojure's future function
  # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Future.html java.util.concurrent.Future
  class Future < IVar

    # Create a new `Future` in the `:unscheduled` state.
    #
    # @yield the asynchronous operation to perform
    #
    # @!macro executor_and_deref_options
    #
    # @option opts [object, Array] :args zero or more arguments to be passed the task
    #   block on execution
    #
    # @raise [ArgumentError] if no block is given
    def initialize(opts = {}, &block)
      raise ArgumentError.new('no block given') unless block_given?
      super(NULL, opts.merge(__task_from_block__: block), &nil)
    end

    # Execute an `:unscheduled` `Future`. Immediately sets the state to `:pending` and
    # passes the block to a new thread/thread pool for eventual execution.
    # Does nothing if the `Future` is in any state other than `:unscheduled`.
    #
    # @return [Future] a reference to `self`
    #
    # @example Instance and execute in separate steps
    #   future = Concurrent::Future.new{ sleep(1); 42 }
    #   future.state #=> :unscheduled
    #   future.execute
    #   future.state #=> :pending
    #
    # @example Instance and execute in one line
    #   future = Concurrent::Future.new{ sleep(1); 42 }.execute
    #   future.state #=> :pending
    def execute
      if compare_and_set_state(:pending, :unscheduled)
        @executor.post{ safe_execute(@task, @args) }
        self
      end
    end

    # Create a new `Future` object with the given block, execute it, and return the
    # `:pending` object.
    #
    # @yield the asynchronous operation to perform
    #
    # @!macro executor_and_deref_options
    #
    # @option opts [object, Array] :args zero or more arguments to be passed the task
    #   block on execution
    #
    # @raise [ArgumentError] if no block is given
    #
    # @return [Future] the newly created `Future` in the `:pending` state
    #
    # @example
    #   future = Concurrent::Future.execute{ sleep(1); 42 }
    #   future.state #=> :pending
    def self.execute(opts = {}, &block)
      Future.new(opts, &block).execute
    end

    # @!macro ivar_set_method
    def set(value = NULL, &block)
      check_for_block_or_value!(block_given?, value)
      synchronize do
        if @state != :unscheduled
          raise MultipleAssignmentError
        else
          @task = block || Proc.new { value }
        end
      end
      execute
    end

    # Attempt to cancel the operation if it has not already processed.
    # The operation can only be cancelled while still `pending`. It cannot
    # be cancelled once it has begun processing or has completed.
    #
    # @return [Boolean] was the operation successfully cancelled.
    def cancel
      if compare_and_set_state(:cancelled, :pending)
        complete(false, nil, CancelledOperationError.new)
        true
      else
        false
      end
    end

    # Has the operation been successfully cancelled?
    #
    # @return [Boolean]
    def cancelled?
      state == :cancelled
    end

    # Wait the given number of seconds for the operation to complete.
    # On timeout attempt to cancel the operation.
    #
    # @param [Numeric] timeout the maximum time in seconds to wait.
    # @return [Boolean] true if the operation completed before the timeout
    #   else false
    def wait_or_cancel(timeout)
      wait(timeout)
      if complete?
        true
      else
        cancel
        false
      end
    end

    protected

    def ns_initialize(value, opts)
      super
      @state = :unscheduled
      @task = opts[:__task_from_block__]
      @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
      @args = get_arguments_from(opts)
    end
  end
end