Class: Concurrent::Promises::Future

Inherits:
AbstractEventFuture show all
Includes:
ActorIntegration, NewChannelIntegration, ThrottleIntegration
Defined in:
lib/concurrent/edge/promises.rb,
lib/concurrent/edge/throttle.rb,
lib/concurrent/edge/old_channel_integration.rb,
lib/concurrent/edge/promises.rb,
lib/concurrent/edge/promises.rb

Overview

Represents a value which will become available in future. May reject with a reason instead, e.g. when the tasks raises an exception.

Direct Known Subclasses

ResolvableFuture

Defined Under Namespace

Modules: ActorIntegration, NewChannelIntegration, ThrottleIntegration

Constant Summary

Instance Method Summary (collapse)

Instance Method Details

- (Future) any(event_or_future) Also known as: |

Creates a new event which will be resolved when the first of receiver, event_or_future resolves. Returning future will have value nil if event_or_future is event and resolves first.

Returns:



1048
1049
1050
# File 'lib/concurrent/edge/promises.rb', line 1048

def any(event_or_future)
  AnyResolvedFuturePromise.new_blocked_by2(self, event_or_future, @DefaultExecutor).future
end

- (Future) delay

Creates new future dependent on receiver which will not evaluate until touched, see AbstractEventFuture#touch. In other words, it inserts delay into the chain of Futures making rest of it lazy evaluated.

Returns:



1058
1059
1060
1061
# File 'lib/concurrent/edge/promises.rb', line 1058

def delay
  event = DelayPromise.new(@DefaultExecutor).event
  ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
end

- (Exception) exception(*args)

Allows rejected Future to be risen with raise method.

Examples:

raise Promises.rejected_future(StandardError.new("boom"))

Returns:

  • (Exception)

Raises:

  • (StandardError)

    when raising not rejected future



983
984
985
986
987
988
989
990
991
992
993
# File 'lib/concurrent/edge/promises.rb', line 983

def exception(*args)
  raise Concurrent::Error, 'it is not rejected' unless rejected?
  reason = Array(internal_state.reason).compact
  if reason.size > 1
    Concurrent::MultipleErrors.new reason
  else
    ex = reason[0].exception(*args)
    ex.set_backtrace ex.backtrace + caller
    ex
  end
end

- (Event) flat_event

Creates new event which will be resolved when the returned event by receiver is. Be careful if the receiver rejects it will just resolve since Event does not hold reason.

Returns:



1093
1094
1095
# File 'lib/concurrent/edge/promises.rb', line 1093

def flat_event
  FlatEventPromise.new_blocked_by1(self, @DefaultExecutor).event
end

- (Future) flat_future(level = 1) Also known as: flat

Creates new future which will have result of the future returned by receiver. If receiver rejects it will have its rejection.

Parameters:

  • level (Integer) (defaults to: 1)

    how many levels of futures should flatten

Returns:



1083
1084
1085
# File 'lib/concurrent/edge/promises.rb', line 1083

def flat_future(level = 1)
  FlatFuturePromise.new_blocked_by1(self, level, @DefaultExecutor).future
end

- (Boolean) fulfilled?(state = internal_state)

Is it in fulfilled state?

Returns:

  • (Boolean)


915
916
917
# File 'lib/concurrent/edge/promises.rb', line 915

def fulfilled?(state = internal_state)
  state.resolved? && state.fulfilled?
end

- (self) on_fulfillment(*args, &callback)

Shortcut of #on_fulfillment_using with default :io executor supplied.

Returns:

  • (self)

See Also:



1099
1100
1101
# File 'lib/concurrent/edge/promises.rb', line 1099

def on_fulfillment(*args, &callback)
  on_fulfillment_using @DefaultExecutor, *args, &callback
end

- (self) on_fulfillment!(*args) {|value, *args| ... }

Stores the callback to be executed synchronously on resolving thread after it is fulfilled. Does nothing on rejection.

Parameters:

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1110
1111
1112
# File 'lib/concurrent/edge/promises.rb', line 1110

def on_fulfillment!(*args, &callback)
  add_callback :callback_on_fulfillment, args, callback
end

- (self) on_fulfillment_using(executor, *args) {|value, *args| ... }

Stores the callback to be executed asynchronously on executor after it is fulfilled. Does nothing on rejection.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1122
1123
1124
# File 'lib/concurrent/edge/promises.rb', line 1122

def on_fulfillment_using(executor, *args, &callback)
  add_callback :async_callback_on_fulfillment, executor, args, callback
end

- (self) on_rejection(*args, &callback)

Shortcut of #on_rejection_using with default :io executor supplied.

Returns:

  • (self)

See Also:



1128
1129
1130
# File 'lib/concurrent/edge/promises.rb', line 1128

def on_rejection(*args, &callback)
  on_rejection_using @DefaultExecutor, *args, &callback
end

- (self) on_rejection!(*args) {|reason, *args| ... }

Stores the callback to be executed synchronously on resolving thread after it is rejected. Does nothing on fulfillment.

Parameters:

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1139
1140
1141
# File 'lib/concurrent/edge/promises.rb', line 1139

def on_rejection!(*args, &callback)
  add_callback :callback_on_rejection, args, callback
end

- (self) on_rejection_using(executor, *args) {|reason, *args| ... }

Stores the callback to be executed asynchronously on executor after it is rejected. Does nothing on fulfillment.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the callback.

Yield Returns:

  • is forgotten.

Returns:

  • (self)


1151
1152
1153
# File 'lib/concurrent/edge/promises.rb', line 1151

def on_rejection_using(executor, *args, &callback)
  add_callback :async_callback_on_rejection, executor, args, callback
end

- (Exception, nil) reason(timeout = nil)

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Returns reason of future's rejection. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Exception, nil)

    nil on timeout or fulfillment.



949
950
951
# File 'lib/concurrent/edge/promises.rb', line 949

def reason(timeout = nil)
  internal_state.reason if wait_until_resolved timeout
end

- (Boolean) rejected?(state = internal_state)

Is it in rejected state?

Returns:

  • (Boolean)


921
922
923
# File 'lib/concurrent/edge/promises.rb', line 921

def rejected?(state = internal_state)
  state.resolved? && !state.fulfilled?
end

- (Future) rescue(*args, &task)

Shortcut of #rescue_on with default :io executor supplied.

Returns:

See Also:



1015
1016
1017
# File 'lib/concurrent/edge/promises.rb', line 1015

def rescue(*args, &task)
  rescue_on @DefaultExecutor, *args, &task
end

- (Future) rescue_on(executor, *args) {|reason, *args| ... }

Chains the task to be executed asynchronously on executor after it rejects. Does not run the task if it fulfills. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (reason, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

Returns:



1027
1028
1029
# File 'lib/concurrent/edge/promises.rb', line 1027

def rescue_on(executor, *args, &task)
  RescuePromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end

- (Promises::Future) rescue_throttled_by(throttle, *args, &block) Originally defined in module ThrottleIntegration

Behaves as Concurrent::Promises::Future#rescue but the it is throttled.

- (Array(Boolean, Object, Exception), nil) result(timeout = nil)

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Returns triplet fulfilled?, value, reason. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Array(Boolean, Object, Exception), nil)

    triplet of fulfilled?, value, reason, or nil on timeout.



960
961
962
# File 'lib/concurrent/edge/promises.rb', line 960

def result(timeout = nil)
  internal_state.result if wait_until_resolved timeout
end

- (Future) run

Allows to use futures as green threads. The receiver has to evaluate to a future which represents what should be done next. It basically flattens indefinitely until non Future values is returned which becomes result of the returned future. Any encountered exception will become reason of the returned future.

Examples:

body = lambda do |v|
  v += 1
  v < 5 ? Promises.future(v, &body) : v
end
Promises.future(0, &body).run.value! # => 5

Returns:



1167
1168
1169
# File 'lib/concurrent/edge/promises.rb', line 1167

def run
  RunFuturePromise.new_blocked_by1(self, @DefaultExecutor).future
end

- (Future) schedule(intended_time)

Creates new event dependent on receiver scheduled to execute on/in intended_time. In time is interpreted from the moment the receiver is resolved, therefore it inserts delay into the chain.

Parameters:

  • intended_time (Numeric, Time)

    Numeric means to run in intended_time seconds. Time means to run on intended_time.

Returns:



1065
1066
1067
1068
1069
1070
# File 'lib/concurrent/edge/promises.rb', line 1065

def schedule(intended_time)
  chain do
    event = ScheduledPromise.new(@DefaultExecutor, intended_time).event
    ZipFutureEventPromise.new_blocked_by2(self, event, @DefaultExecutor).future
  end.flat
end

- (Future) then(*args, &task)

Shortcut of #then_on with default :io executor supplied.

Returns:

See Also:



997
998
999
# File 'lib/concurrent/edge/promises.rb', line 997

def then(*args, &task)
  then_on @DefaultExecutor, *args, &task
end

- (Future) then_ask(actor) Originally defined in module ActorIntegration

Asks the actor with its value.

Returns:

  • (Future)

    new future with the response form the actor

- (Future) then_on(executor, *args) {|value, *args| ... }

Chains the task to be executed asynchronously on executor after it fulfills. Does not run the task if it rejects. It will resolve though, triggering any dependent futures.

Parameters:

  • executor (Executor, :io, :fast)

    Instance of an executor or a name of the global executor. The task is executed on it, default executor remains unchanged.

  • args (Object)

    arguments which are passed to the task when it's executed. (It might be prepended with other arguments, see the @yeild section).

Yields:

  • (value, *args)

    to the task.

Yield Returns:

  • will become result of the returned Future. Its returned value becomes #value fulfilling it, raised exception becomes #reason rejecting it.

Returns:



1009
1010
1011
# File 'lib/concurrent/edge/promises.rb', line 1009

def then_on(executor, *args, &task)
  ThenPromise.new_blocked_by1(self, @DefaultExecutor, executor, args, &task).future
end

- (Future) then_push_channel(channel) Originally defined in module NewChannelIntegration

Returns a future which is fulfilled after the message is pushed to the channel. May take a moment if the channel is full.

Parameters:

  • channel (Channel)

    to push to.

Returns:

  • (Future)

    a future which is fulfilled after the message is pushed to the channel. May take a moment if the channel is full.

- (Promises::Future) then_throttled_by(throttle, *args, &block) Originally defined in module ThrottleIntegration

Behaves as Concurrent::Promises::Future#then but the it is throttled.

Examples:

data     = (1..5).to_a
db       = data.reduce({}) { |h, v| h.update v => v.to_s }
max_two  = Throttle.new 2

futures = data.map do |data|
  Promises.future(data) do |data|
    # un-throttled, concurrency level equal data.size
    data + 1
  end.then_throttled_by(max_two, db) do |v, db|
    # throttled, only 2 tasks executed at the same time
    # e.g. limiting access to db
    db[v]
  end
end

futures.map(&:value!) # => [2, 3, 4, 5, nil]

Returns:

See Also:

- (Event) to_event

Converts future to event which is resolved when future is resolved by fulfillment or rejection.

Returns:



1179
1180
1181
1182
1183
# File 'lib/concurrent/edge/promises.rb', line 1179

def to_event
  event = Promises.resolvable_event
ensure
  chain_resolvable(event)
end

- (Future) to_future

Returns self, since this is a future

Returns:



1187
1188
1189
# File 'lib/concurrent/edge/promises.rb', line 1187

def to_future
  self
end

- (Object, nil) value(timeout = nil)

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Return value of the future. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Object, nil)

    the value of the Future when fulfilled, nil on timeout or rejection.



938
939
940
# File 'lib/concurrent/edge/promises.rb', line 938

def value(timeout = nil)
  internal_state.value if wait_until_resolved timeout
end

- (Object, nil) value!(timeout = nil)

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Note:

Make sure returned nil is not confused with timeout, no value when rejected, no reason when fulfilled, etc. Use more exact methods if needed, like AbstractEventFuture#wait, #value!, #result, etc.

Return value of the future. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Object, nil)

    the value of the Future when fulfilled, nil on timeout.

Raises:

  • (Exception)

    #reason on rejection



974
975
976
# File 'lib/concurrent/edge/promises.rb', line 974

def value!(timeout = nil)
  internal_state.value if wait_until_resolved! timeout
end

- (Future, true, false) wait!(timeout = nil)

Note:

This function potentially blocks current thread until the Future is resolved. Be careful it can deadlock. Try to chain instead.

Wait (block the Thread) until receiver is AbstractEventFuture#resolved?. Calls AbstractEventFuture#touch.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

  • (Future, true, false)

    self implies timeout was not used, true implies timeout was used and it was resolved, false implies it was not resolved within timeout.

Raises:

  • (Exception)

    #reason on rejection



966
967
968
969
# File 'lib/concurrent/edge/promises.rb', line 966

def wait!(timeout = nil)
  result = wait_until_resolved!(timeout)
  timeout ? result : self
end

- (Future) with_default_executor(executor)

Crates new object with same class with the executor set as its new default executor. Any futures depending on it will use the new default executor.

Returns:



1074
1075
1076
# File 'lib/concurrent/edge/promises.rb', line 1074

def with_default_executor(executor)
  FutureWrapperPromise.new_blocked_by1(self, executor).future
end

- (Future) zip(other) Also known as: &

Creates a new event or a future which will be resolved when receiver and other are. Returns an event if receiver and other are events, otherwise returns a future. If just one of the parties is Future then the result of the returned future is equal to the result of the supplied future. If both are futures then the result is as described in Concurrent::Promises::FactoryMethods#zip_futures_on.

Returns:



1033
1034
1035
1036
1037
1038
1039
# File 'lib/concurrent/edge/promises.rb', line 1033

def zip(other)
  if other.is_a?(Future)
    ZipFuturesPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  else
    ZipFutureEventPromise.new_blocked_by2(self, other, @DefaultExecutor).future
  end
end