File: obligation.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (220 lines) | stat: -rw-r--r-- 5,994 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
require 'thread'
require 'timeout'

require 'concurrent/atomic/event'
require 'concurrent/concern/dereferenceable'

module Concurrent
  module Concern

    module Obligation
      include Concern::Dereferenceable
      # NOTE: The Dereferenceable module is going away in 2.0. In the mean time
      # we need it to place nicely with the synchronization layer. This means
      # that the including class SHOULD be synchronized and it MUST implement a
      # `#synchronize` method. Not doing so will lead to runtime errors.

      # Has the obligation been fulfilled?
      #
      # @return [Boolean]
      def fulfilled?
        state == :fulfilled
      end
      alias_method :realized?, :fulfilled?

      # Has the obligation been rejected?
      #
      # @return [Boolean]
      def rejected?
        state == :rejected
      end

      # Is obligation completion still pending?
      #
      # @return [Boolean]
      def pending?
        state == :pending
      end

      # Is the obligation still unscheduled?
      #
      # @return [Boolean]
      def unscheduled?
        state == :unscheduled
      end

      # Has the obligation completed processing?
      #
      # @return [Boolean]
      def complete?
        [:fulfilled, :rejected].include? state
      end

      # Is the obligation still awaiting completion of processing?
      #
      # @return [Boolean]
      def incomplete?
        ! complete?
      end

      # The current value of the obligation. Will be `nil` while the state is
      # pending or the operation has been rejected.
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Object] see Dereferenceable#deref
      def value(timeout = nil)
        wait timeout
        deref
      end

      # Wait until obligation is complete or the timeout has been reached.
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Obligation] self
      def wait(timeout = nil)
        event.wait(timeout) if timeout != 0 && incomplete?
        self
      end

      # Wait until obligation is complete or the timeout is reached. Will re-raise
      # any exceptions raised during processing (but will not raise an exception
      # on timeout).
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Obligation] self
      # @raise [Exception] raises the reason when rejected
      def wait!(timeout = nil)
        wait(timeout).tap { raise self if rejected? }
      end
      alias_method :no_error!, :wait!

      # The current value of the obligation. Will be `nil` while the state is
      # pending or the operation has been rejected. Will re-raise any exceptions
      # raised during processing (but will not raise an exception on timeout).
      #
      # @param [Numeric] timeout the maximum time in seconds to wait.
      # @return [Object] see Dereferenceable#deref
      # @raise [Exception] raises the reason when rejected
      def value!(timeout = nil)
        wait(timeout)
        if rejected?
          raise self
        else
          deref
        end
      end

      # The current state of the obligation.
      #
      # @return [Symbol] the current state
      def state
        synchronize { @state }
      end

      # If an exception was raised during processing this will return the
      # exception object. Will return `nil` when the state is pending or if
      # the obligation has been successfully fulfilled.
      #
      # @return [Exception] the exception raised during processing or `nil`
      def reason
        synchronize { @reason }
      end

      # @example allows Obligation to be risen
      #   rejected_ivar = Ivar.new.fail
      #   raise rejected_ivar
      def exception(*args)
        raise 'obligation is not rejected' unless rejected?
        reason.exception(*args)
      end

      protected

      # @!visibility private
      def get_arguments_from(opts = {})
        [*opts.fetch(:args, [])]
      end

      # @!visibility private
      def init_obligation
        @event = Event.new
        @value = @reason = nil
      end

      # @!visibility private
      def event
        @event
      end

      # @!visibility private
      def set_state(success, value, reason)
        if success
          @value = value
          @state = :fulfilled
        else
          @reason = reason
          @state  = :rejected
        end
      end

      # @!visibility private
      def state=(value)
        synchronize { ns_set_state(value) }
      end

      # Atomic compare and set operation
      # State is set to `next_state` only if `current state == expected_current`.
      #
      # @param [Symbol] next_state
      # @param [Symbol] expected_current
      #
      # @return [Boolean] true is state is changed, false otherwise
      #
      # @!visibility private
      def compare_and_set_state(next_state, *expected_current)
        synchronize do
          if expected_current.include? @state
            @state = next_state
            true
          else
            false
          end
        end
      end

      # Executes the block within mutex if current state is included in expected_states
      #
      # @return block value if executed, false otherwise
      #
      # @!visibility private
      def if_state(*expected_states)
        synchronize do
          raise ArgumentError.new('no block given') unless block_given?

          if expected_states.include? @state
            yield
          else
            false
          end
        end
      end

      protected

      # Am I in the current state?
      #
      # @param [Symbol] expected The state to check against
      # @return [Boolean] true if in the expected state else false
      #
      # @!visibility private
      def ns_check_state?(expected)
        @state == expected
      end

      # @!visibility private
      def ns_set_state(value)
        @state = value
      end
    end
  end
end