1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
# TODO try stealing pool, each thread has it's own queue
require 'concurrent/promises'
module Concurrent
module Promises
class Future < AbstractEventFuture
# @!macro warn.edge
module ActorIntegration
# Asks the actor with its value.
# @return [Future] new future with the response form the actor
def then_ask(actor)
self.then(actor) { |v, a| a.ask_op(v) }.flat
end
end
include ActorIntegration
# @!macro warn.edge
module FlatShortcuts
# @return [Future]
def then_flat_future(*args, &block)
self.then(*args, &block).flat_future
end
alias_method :then_flat, :then_flat_future
# @return [Future]
def then_flat_future_on(executor, *args, &block)
self.then_on(executor, *args, &block).flat_future
end
alias_method :then_flat_on, :then_flat_future_on
# @return [Event]
def then_flat_event(*args, &block)
self.then(*args, &block).flat_event
end
# @return [Event]
def then_flat_event_on(executor, *args, &block)
self.then_on(executor, *args, &block).flat_event
end
end
include FlatShortcuts
end
class Future < AbstractEventFuture
# @!macro warn.edge
module NewChannelIntegration
# @param [Channel] channel to push to.
# @return [Future] a future which is fulfilled after the message is pushed to the channel.
# May take a moment if the channel is full.
def then_channel_push(channel)
self.then(channel) { |value, ch| ch.push_op value }.flat_future
end
end
include NewChannelIntegration
end
module FactoryMethods
# @!macro promises.shortcut.on
# @return [Future]
# @!macro warn.edge
def zip_futures_over(enumerable, &future_factory)
zip_futures_over_on default_executor, enumerable, &future_factory
end
# Creates new future which is resolved after all the futures created by future_factory from
# enumerable elements are resolved. Simplified it does:
# `zip(*enumerable.map { |e| future e, &future_factory })`
# @example
# # `#succ` calls are executed in parallel
# zip_futures_over_on(:io, [1, 2], &:succ).value! # => [2, 3]
#
# @!macro promises.param.default_executor
# @param [Enumerable] enumerable
# @yield a task to be executed in future
# @yieldparam [Object] element from enumerable
# @yieldreturn [Object] a value of the future
# @return [Future]
# @!macro warn.edge
def zip_futures_over_on(default_executor, enumerable, &future_factory)
# ZipFuturesPromise.new_blocked_by(futures_and_or_events, default_executor).future
zip_futures_on(default_executor, *enumerable.map { |e| future e, &future_factory })
end
end
module Resolvable
include InternalStates
# Reserves the event or future, if reserved others are prevented from resolving it.
# Advanced feature.
# Be careful about the order of reservation to avoid deadlocks,
# the method blocks if the future or event is already reserved
# until it is released or resolved.
#
# @example
# f = Concurrent::Promises.resolvable_future
# reserved = f.reserve
# Thread.new { f.resolve true, :val, nil } # fails
# f.resolve true, :val, nil, true if reserved # must be called only if reserved
# @return [true, false] on successful reservation
def reserve
while true
return true if compare_and_set_internal_state(PENDING, RESERVED)
return false if resolved?
# FIXME (pitr-ch 17-Jan-2019): sleep until given up or resolved instead of busy wait
Thread.pass
end
end
# @return [true, false] on successful release of the reservation
def release
compare_and_set_internal_state(RESERVED, PENDING)
end
# @return [Comparable] an item to sort the resolvable events or futures
# by to get the right global locking order of resolvable events or futures
# @see .atomic_resolution
def self.locking_order_by(resolvable)
resolvable.object_id
end
# Resolves all passed events and futures to the given resolutions
# if possible (all are unresolved) or none.
#
# @param [Hash{Resolvable=>resolve_arguments}, Array<Array(Resolvable, resolve_arguments)>] resolvable_map
# collection of resolvable events and futures which should be resolved all at once
# and what should they be resolved to, examples:
# ```ruby
# { a_resolvable_future1 => [true, :val, nil],
# a_resolvable_future2 => [false, nil, :err],
# a_resolvable_event => [] }
# ```
# or
# ```ruby
# [[a_resolvable_future1, [true, :val, nil]],
# [a_resolvable_future2, [false, nil, :err]],
# [a_resolvable_event, []]]
# ```
# @return [true, false] if success
def self.atomic_resolution(resolvable_map)
# atomic_resolution event => [], future => [true, :v, nil]
sorted = resolvable_map.to_a.sort_by { |resolvable, _| locking_order_by resolvable }
reserved = 0
while reserved < sorted.size && sorted[reserved].first.reserve
reserved += 1
end
if reserved == sorted.size
sorted.each { |resolvable, args| resolvable.resolve(*args, true, true) }
true
else
while reserved > 0
reserved -= 1
raise 'has to be reserved' unless sorted[reserved].first.release
end
false
end
end
end
end
end
|