1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
require 'thread'
require 'timeout'
require 'concurrent/atomic/event'
require 'concurrent/concern/dereferenceable'
module Concurrent
module Concern
module Obligation
include Concern::Dereferenceable
# NOTE: The Dereferenceable module is going away in 2.0. In the mean time
# we need it to place nicely with the synchronization layer. This means
# that the including class SHOULD be synchronized and it MUST implement a
# `#synchronize` method. Not doing so will lead to runtime errors.
# Has the obligation been fulfilled?
#
# @return [Boolean]
def fulfilled?
state == :fulfilled
end
alias_method :realized?, :fulfilled?
# Has the obligation been rejected?
#
# @return [Boolean]
def rejected?
state == :rejected
end
# Is obligation completion still pending?
#
# @return [Boolean]
def pending?
state == :pending
end
# Is the obligation still unscheduled?
#
# @return [Boolean]
def unscheduled?
state == :unscheduled
end
# Has the obligation completed processing?
#
# @return [Boolean]
def complete?
[:fulfilled, :rejected].include? state
end
# Is the obligation still awaiting completion of processing?
#
# @return [Boolean]
def incomplete?
! complete?
end
# The current value of the obligation. Will be `nil` while the state is
# pending or the operation has been rejected.
#
# @param [Numeric] timeout the maximum time in seconds to wait.
# @return [Object] see Dereferenceable#deref
def value(timeout = nil)
wait timeout
deref
end
# Wait until obligation is complete or the timeout has been reached.
#
# @param [Numeric] timeout the maximum time in seconds to wait.
# @return [Obligation] self
def wait(timeout = nil)
event.wait(timeout) if timeout != 0 && incomplete?
self
end
# Wait until obligation is complete or the timeout is reached. Will re-raise
# any exceptions raised during processing (but will not raise an exception
# on timeout).
#
# @param [Numeric] timeout the maximum time in seconds to wait.
# @return [Obligation] self
# @raise [Exception] raises the reason when rejected
def wait!(timeout = nil)
wait(timeout).tap { raise self if rejected? }
end
alias_method :no_error!, :wait!
# The current value of the obligation. Will be `nil` while the state is
# pending or the operation has been rejected. Will re-raise any exceptions
# raised during processing (but will not raise an exception on timeout).
#
# @param [Numeric] timeout the maximum time in seconds to wait.
# @return [Object] see Dereferenceable#deref
# @raise [Exception] raises the reason when rejected
def value!(timeout = nil)
wait(timeout)
if rejected?
raise self
else
deref
end
end
# The current state of the obligation.
#
# @return [Symbol] the current state
def state
synchronize { @state }
end
# If an exception was raised during processing this will return the
# exception object. Will return `nil` when the state is pending or if
# the obligation has been successfully fulfilled.
#
# @return [Exception] the exception raised during processing or `nil`
def reason
synchronize { @reason }
end
# @example allows Obligation to be risen
# rejected_ivar = Ivar.new.fail
# raise rejected_ivar
def exception(*args)
raise 'obligation is not rejected' unless rejected?
reason.exception(*args)
end
protected
# @!visibility private
def get_arguments_from(opts = {})
[*opts.fetch(:args, [])]
end
# @!visibility private
def init_obligation
@event = Event.new
@value = @reason = nil
end
# @!visibility private
def event
@event
end
# @!visibility private
def set_state(success, value, reason)
if success
@value = value
@state = :fulfilled
else
@reason = reason
@state = :rejected
end
end
# @!visibility private
def state=(value)
synchronize { ns_set_state(value) }
end
# Atomic compare and set operation
# State is set to `next_state` only if `current state == expected_current`.
#
# @param [Symbol] next_state
# @param [Symbol] expected_current
#
# @return [Boolean] true is state is changed, false otherwise
#
# @!visibility private
def compare_and_set_state(next_state, *expected_current)
synchronize do
if expected_current.include? @state
@state = next_state
true
else
false
end
end
end
# Executes the block within mutex if current state is included in expected_states
#
# @return block value if executed, false otherwise
#
# @!visibility private
def if_state(*expected_states)
synchronize do
raise ArgumentError.new('no block given') unless block_given?
if expected_states.include? @state
yield
else
false
end
end
end
protected
# Am I in the current state?
#
# @param [Symbol] expected The state to check against
# @return [Boolean] true if in the expected state else false
#
# @!visibility private
def ns_check_state?(expected)
@state == expected
end
# @!visibility private
def ns_set_state(value)
@state = value
end
end
end
end
|