Module: Concurrent
- Defined in:
- lib/concurrent.rb,
lib/concurrent/map.rb,
lib/concurrent/atom.rb,
lib/concurrent/edge.rb,
lib/concurrent/ivar.rb,
lib/concurrent/mvar.rb,
lib/concurrent/tvar.rb,
lib/concurrent/hash.rb,
lib/concurrent/async.rb,
lib/concurrent/delay.rb,
lib/concurrent/tuple.rb,
lib/concurrent/maybe.rb,
lib/concurrent/actor.rb,
lib/concurrent/agent.rb,
lib/concurrent/array.rb,
lib/concurrent/errors.rb,
lib/concurrent/future.rb,
lib/concurrent/channel.rb,
lib/concurrent/version.rb,
lib/concurrent/promise.rb,
lib/concurrent/options.rb,
lib/concurrent/dataflow.rb,
lib/concurrent/constants.rb,
lib/concurrent/exchanger.rb,
lib/concurrent/timer_task.rb,
lib/concurrent/actor/root.rb,
lib/concurrent/actor/core.rb,
lib/concurrent/actor/utils.rb,
lib/concurrent/atomic/event.rb,
lib/concurrent/actor/errors.rb,
lib/concurrent/channel/tick.rb,
lib/concurrent/edge/throttle.rb,
lib/concurrent/configuration.rb,
lib/concurrent/actor/context.rb,
lib/concurrent/edge/promises.rb,
lib/concurrent/lazy_register.rb,
lib/concurrent/mutable_struct.rb,
lib/concurrent/scheduled_task.rb,
lib/concurrent/actor/envelope.rb,
lib/concurrent/utility/engine.rb,
lib/concurrent/utility/at_exit.rb,
lib/concurrent/settable_struct.rb,
lib/concurrent/actor/reference.rb,
lib/concurrent/concern/logging.rb,
lib/concurrent/synchronization.rb,
lib/concurrent/actor/behaviour.rb,
lib/concurrent/immutable_struct.rb,
lib/concurrent/thread_safe/util.rb,
lib/concurrent/actor/type_check.rb,
lib/concurrent/channel/selector.rb,
lib/concurrent/atomic/semaphore.rb,
lib/concurrent/actor/utils/pool.rb,
lib/concurrent/edge/cancellation.rb,
lib/concurrent/executor/timer_set.rb,
lib/concurrent/actor/utils/ad_hoc.rb,
lib/concurrent/concern/observable.rb,
lib/concurrent/concern/obligation.rb,
lib/concurrent/channel/buffer/base.rb,
lib/concurrent/concern/deprecation.rb,
lib/concurrent/atomic/atomic_fixnum.rb,
lib/concurrent/edge/lock_free_queue.rb,
lib/concurrent/synchronization/lock.rb,
lib/concurrent/atomic_reference/rbx.rb,
lib/concurrent/channel/buffer/timer.rb,
lib/concurrent/actor/utils/balancer.rb,
lib/concurrent/edge/lock_free_stack.rb,
lib/concurrent/atomic/atomic_boolean.rb,
lib/concurrent/channel/buffer/ticker.rb,
lib/concurrent/edge/processing_actor.rb,
lib/concurrent/actor/utils/broadcast.rb,
lib/concurrent/atomic/cyclic_barrier.rb,
lib/concurrent/atomic_reference/ruby.rb,
lib/concurrent/atomic/mutex_semaphore.rb,
lib/concurrent/actor/behaviour/awaits.rb,
lib/concurrent/actor/behaviour/buffer.rb,
lib/concurrent/atomic/read_write_lock.rb,
lib/concurrent/atomic_reference/jruby.rb,
lib/concurrent/channel/buffer/sliding.rb,
lib/concurrent/synchronization/object.rb,
lib/concurrent/thread_safe/util/adder.rb,
lib/concurrent/utility/monotonic_time.rb,
lib/concurrent/utility/native_integer.rb,
lib/concurrent/actor/behaviour/pausing.rb,
lib/concurrent/channel/buffer/dropping.rb,
lib/concurrent/concern/dereferenceable.rb,
lib/concurrent/channel/buffer/buffered.rb,
lib/concurrent/atomic/thread_local_var.rb,
lib/concurrent/actor/behaviour/linking.rb,
lib/concurrent/atomic/count_down_latch.rb,
lib/concurrent/actor/public_delegations.rb,
lib/concurrent/actor/behaviour/abstract.rb,
lib/concurrent/synchronization/volatile.rb,
lib/concurrent/edge/lock_free_linked_set.rb,
lib/concurrent/thread_safe/util/volatile.rb,
lib/concurrent/utility/processor_counter.rb,
lib/concurrent/executor/executor_service.rb,
lib/concurrent/synchronization/condition.rb,
lib/concurrent/channel/buffer/unbuffered.rb,
lib/concurrent/synchronization/mri_object.rb,
lib/concurrent/synchronization/rbx_object.rb,
lib/concurrent/actor/internal_delegations.rb,
lib/concurrent/atomic/mutex_atomic_fixnum.rb,
lib/concurrent/thread_safe/util/striped64.rb,
lib/concurrent/executor/fixed_thread_pool.rb,
lib/concurrent/executor/cached_thread_pool.rb,
lib/concurrent/channel/selector/put_clause.rb,
lib/concurrent/executor/immediate_executor.rb,
lib/concurrent/executor/safe_task_executor.rb,
lib/concurrent/atomic/mutex_atomic_boolean.rb,
lib/concurrent/actor/behaviour/termination.rb,
lib/concurrent/actor/behaviour/supervising.rb,
lib/concurrent/atomic/ruby_thread_local_var.rb,
lib/concurrent/synchronization/jruby_object.rb,
lib/concurrent/edge/old_channel_integration.rb,
lib/concurrent/channel/selector/take_clause.rb,
lib/concurrent/atomic/java_thread_local_var.rb,
lib/concurrent/atomic/java_count_down_latch.rb,
lib/concurrent/actor/behaviour/sets_results.rb,
lib/concurrent/actor/behaviour/removes_child.rb,
lib/concurrent/channel/selector/error_clause.rb,
lib/concurrent/executor/serialized_execution.rb,
lib/concurrent/executor/thread_pool_executor.rb,
lib/concurrent/atomic/mutex_count_down_latch.rb,
lib/concurrent/atomic_reference/mutex_atomic.rb,
lib/concurrent/channel/selector/after_clause.rb,
lib/concurrent/executor/java_executor_service.rb,
lib/concurrent/edge/atomic_markable_reference.rb,
lib/concurrent/edge/lock_free_linked_set/node.rb,
lib/concurrent/executor/ruby_executor_service.rb,
lib/concurrent/synchronization/truffle_object.rb,
lib/concurrent/atomic_reference/direct_update.rb,
lib/concurrent/collection/map/mri_map_backend.rb,
lib/concurrent/synchronization/lockable_object.rb,
lib/concurrent/executor/single_thread_executor.rb,
lib/concurrent/thread_safe/util/array_hash_rbx.rb,
lib/concurrent/synchronization/abstract_struct.rb,
lib/concurrent/channel/selector/default_clause.rb,
lib/concurrent/utility/native_extension_loader.rb,
lib/concurrent/thread_safe/util/cheap_lockable.rb,
lib/concurrent/synchronization/abstract_object.rb,
lib/concurrent/atomic/abstract_thread_local_var.rb,
lib/concurrent/executor/serial_executor_service.rb,
lib/concurrent/executor/simple_executor_service.rb,
lib/concurrent/atomic/reentrant_read_write_lock.rb,
lib/concurrent/edge/lock_free_linked_set/window.rb,
lib/concurrent/actor/behaviour/executes_context.rb,
lib/concurrent/actor/default_dead_letter_handler.rb,
lib/concurrent/thread_safe/util/xor_shift_random.rb,
lib/concurrent/thread_safe/synchronized_delegator.rb,
lib/concurrent/executor/ruby_thread_pool_executor.rb,
lib/concurrent/executor/java_thread_pool_executor.rb,
lib/concurrent/executor/abstract_executor_service.rb,
lib/concurrent/synchronization/rbx_lockable_object.rb,
lib/concurrent/thread_safe/util/power_of_two_tuple.rb,
lib/concurrent/synchronization/mri_lockable_object.rb,
lib/concurrent/atomic_reference/numeric_cas_wrapper.rb,
lib/concurrent/executor/ruby_single_thread_executor.rb,
lib/concurrent/executor/indirect_immediate_executor.rb,
lib/concurrent/executor/java_single_thread_executor.rb,
lib/concurrent/synchronization/jruby_lockable_object.rb,
lib/concurrent/collection/copy_on_write_observer_set.rb,
lib/concurrent/collection/copy_on_notify_observer_set.rb,
lib/concurrent/executor/serialized_execution_delegator.rb,
lib/concurrent/synchronization/truffle_lockable_object.rb,
lib/concurrent/collection/map/synchronized_map_backend.rb,
lib/concurrent/collection/non_concurrent_priority_queue.rb,
lib/concurrent/atomic_reference/concurrent_update_error.rb,
lib/concurrent/synchronization/abstract_lockable_object.rb,
lib/concurrent/collection/map/non_concurrent_map_backend.rb,
lib/concurrent/actor/behaviour/errors_on_unknown_message.rb,
lib/concurrent/collection/map/atomic_reference_map_backend.rb,
lib/concurrent/collection/java_non_concurrent_priority_queue.rb,
lib/concurrent/collection/ruby_non_concurrent_priority_queue.rb,
lib/concurrent/edge/promises.rb
Overview
Modern concurrency tools for Ruby. Inspired by Erlang, Clojure, Scala, Haskell, F#, C#, Java, and classic concurrency patterns.
The design goals of this gem are:
- Stay true to the spirit of the languages providing inspiration
- But implement in a way that makes sense for Ruby
- Keep the semantics as idiomatic Ruby as possible
- Support features that make sense in Ruby
- Exclude features that don't make sense in Ruby
- Be small, lean, and loosely coupled
Defined Under Namespace
Modules: Actor, Async, Concern, Edge, ImmutableStruct, MutableStruct, Promises, SettableStruct, Synchronization, Utility Classes: Agent, Array, Atom, AtomicBoolean, AtomicFixnum, AtomicReference, CachedThreadPool, Cancellation, Channel, ConcurrentUpdateError, CountDownLatch, CyclicBarrier, Delay, Event, Exchanger, FixedThreadPool, Future, Hash, IVar, ImmediateExecutor, IndirectImmediateExecutor, JavaThreadPoolExecutor, LazyRegister, MVar, Map, Maybe, MultipleAssignmentError, MultipleErrors, ProcessingActor, Promise, ReadWriteLock, ReentrantReadWriteLock, SafeTaskExecutor, ScheduledTask, Semaphore, SerializedExecution, SerializedExecutionDelegator, SimpleExecutorService, SingleThreadExecutor, TVar, ThreadLocalVar, ThreadPoolExecutor, Throttle, TimerSet, TimerTask, Transaction, Tuple
Constant Summary
- Error =
Class.new(StandardError)
- ConfigurationError =
Raised when errors occur during configuration.
Class.new(Error)
- CancelledOperationError =
Raised when an asynchronous operation is cancelled before execution.
Class.new(Error)
- LifecycleError =
Raised when a lifecycle method (such as
stop
) is called in an improper sequence or when the object is in an inappropriate state. Class.new(Error)
- ImmutabilityError =
Raised when an attempt is made to violate an immutability guarantee.
Class.new(Error)
- IllegalOperationError =
Raised when an operation is attempted which is not legal given the receiver's current state
Class.new(Error)
- InitializationError =
Raised when an object's methods are called when it has not been properly initialized.
Class.new(Error)
- MaxRestartFrequencyError =
Raised when an object with a start/stop lifecycle has been started an excessive number of times. Often used in conjunction with a restart policy or strategy.
Class.new(Error)
- RejectedExecutionError =
Raised by an
Executor
when it is unable to process a given task, possibly because of a reject policy or other internal error. Class.new(Error)
- ResourceLimitError =
Raised when any finite resource, such as a lock counter, exceeds its maximum limit/threshold.
Class.new(Error)
- TimeoutError =
Raised when an operation times out.
Class.new(Error)
- VERSION =
'1.0.5'
- EDGE_VERSION =
'0.3.1'
- PromiseExecutionError =
Class.new(StandardError)
- NULL_LOGGER =
Suppresses all output when used for logging.
lambda { |level, progname, = nil, &block| }
Class Method Summary (collapse)
-
+ (undocumented) abort_transaction
Abort a currently running transaction - see
Concurrent::atomically
. - + (Boolean) allow_c_extensions? extended from Utility::NativeExtensionLoader
-
+ (undocumented) atomically
Run a block that reads and writes
TVar
s as a single atomic transaction. - + (Boolean) c_extensions_loaded? extended from Utility::NativeExtensionLoader
- + (undocumented) call_dataflow(method, executor, *inputs, &block)
-
+ (Logger) create_simple_logger(level = Logger::FATAL, output = $stderr)
Logger with provided level and output.
- + (Logger) create_stdlib_logger(level = Logger::FATAL, output = $stderr) deprecated Deprecated.
-
+ (Object) dataflow(*inputs) {|inputs| ... }
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are
Future
values. The dataflow task itself is also aFuture
value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.Our syntax is somewhat related to that of Akka's
flow
and Habanero Java'sDataDrivenFuture
. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
Example
A dataflow task is created with the
dataflow
method, passing in a block.task = Concurrent::dataflow { 14 }
This produces a simple
Future
value. The task will run immediately, as it has no dependencies. We can also specifyFuture
values that must be available before a task will run. When we do this we get the value of those futures passed to our block.a = Concurrent::dataflow { 1 } b = Concurrent::dataflow { 2 } c = Concurrent::dataflow(a, b) { |av, bv| av + bv }
Using the
dataflow
method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.Derivation
This section describes how we could derive dataflow from other primitives in this library.
Consider a naive fibonacci calculator.
def fib(n) if n < 2 n else fib(n - 1) + fib(n - 2) end end puts fib(14) #=> 377
We could modify this to use futures.
def fib(n) if n < 2 Concurrent::Future.new { n } else n1 = fib(n - 1).execute n2 = fib(n - 2).execute Concurrent::Future.new { n1.value + n2.value } end end f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ... f.execute #=> #<Concurrent::Future:0x000001019ef5a0 ... sleep(0.5) puts f.value #=> 377
One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.
To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.
class CountingObserver def initialize(count, &block) @count = count @block = block end def update(time, value, reason) @count -= 1 if @count <= 0 @block.call() end end end def fib(n) if n < 2 Concurrent::Future.new { n }.execute else n1 = fib(n - 1) n2 = fib(n - 2) result = Concurrent::Future.new { n1.value + n2.value } = CountingObserver.new(2) { result.execute } n1.add_observer n2.add_observer n1.execute n2.execute result end end
We can wrap this up in a dataflow utility.
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ... sleep(0.5) puts f.value #=> 377 def dataflow(*inputs, &block) result = Concurrent::Future.new(&block) if inputs.empty? result.execute else = CountingObserver.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer end end result end def fib(n) if n < 2 dataflow { n } else n1 = fib(n - 1) n2 = fib(n - 2) dataflow(n1, n2) { n1.value + n2.value } end end f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ... sleep(0.5) puts f.value #=> 377
Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.
def dataflow(*inputs, &block) result = Concurrent::Future.new do values = inputs.map { |input| input.value } block.call(*values) end if inputs.empty? result.execute else = CountingObserver.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer end end result end def fib(n) if n < 2 Concurrent::dataflow { n } else n1 = fib(n - 1) n2 = fib(n - 2) Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 } end end f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ... sleep(0.5) puts f.value #=> 377
- + (undocumented) dataflow!(*inputs, &block)
- + (undocumented) dataflow_with(executor, *inputs, &block)
- + (undocumented) dataflow_with!(executor, *inputs, &block)
-
+ (undocumented) disable_at_exit_handlers!
Disables AtExit handlers including pool auto-termination handlers.
-
+ (Executor) executor(executor_identifier)
General access point to global executors.
-
+ (ThreadPoolExecutor) global_fast_executor
Global thread pool optimized for short, fast operations.
- + (undocumented) global_immediate_executor
-
+ (ThreadPoolExecutor) global_io_executor
Global thread pool optimized for long, blocking (IO) tasks.
- + (undocumented) global_logger
- + (undocumented) global_logger=(value)
-
+ (Concurrent::TimerSet) global_timer_set
Global thread pool user for global timers.
- + (Boolean) java_extensions_loaded? extended from Utility::NativeExtensionLoader
-
+ (undocumented) leave_transaction
Leave a transaction without committing or aborting - see
Concurrent::atomically
. - + (undocumented) load_native_extensions extended from Utility::NativeExtensionLoader
-
+ (undocumented) log(level, progname, message = nil, &block)
extended
from Concern::Logging
Logs through Concurrent.global_logger, it can be overridden by setting @logger.
-
+ (Float) monotonic_time
Returns the current time a tracked by the application monotonic clock.
- + (undocumented) new_fast_executor(opts = {})
- + (undocumented) new_io_executor(opts = {})
- + (Boolean) on_cruby? extended from Utility::EngineDetector
- + (Boolean) on_jruby? extended from Utility::EngineDetector
- + (Boolean) on_jruby_9000? extended from Utility::EngineDetector
- + (Boolean) on_linux? extended from Utility::EngineDetector
- + (Boolean) on_osx? extended from Utility::EngineDetector
- + (Boolean) on_rbx? extended from Utility::EngineDetector
- + (Boolean) on_truffle? extended from Utility::EngineDetector
- + (Boolean) on_windows? extended from Utility::EngineDetector
- + (undocumented) physical_processor_count
- + (undocumented) processor_count
- + (undocumented) ruby_engine extended from Utility::EngineDetector
- + (undocumented) ruby_version(comparison, major, minor, patch, version = RUBY_VERSION) extended from Utility::EngineDetector
- + (undocumented) set_c_extensions_loaded extended from Utility::NativeExtensionLoader
- + (undocumented) set_java_extensions_loaded extended from Utility::NativeExtensionLoader
-
+ (undocumented) use_simple_logger(level = Logger::FATAL, output = $stderr)
Use logger created by #create_simple_logger to log concurrent-ruby messages.
- + (undocumented) use_stdlib_logger(level = Logger::FATAL, output = $stderr) deprecated Deprecated.
Class Method Details
+ (undocumented) abort_transaction
Abort a currently running transaction - see Concurrent::atomically
.
150 151 152 |
# File 'lib/concurrent/tvar.rb', line 150 def abort_transaction raise Transaction::AbortError.new end |
+ (Boolean) allow_c_extensions? Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) atomically
Run a block that reads and writes TVar
s as a single atomic transaction.
With respect to the value of TVar
objects, the transaction is atomic, in
that it either happens or it does not, consistent, in that the TVar
objects involved will never enter an illegal state, and isolated, in that
transactions never interfere with each other. You may recognise these
properties from database transactions.
There are some very important and unusual semantics that you must be aware of:
Most importantly, the block that you pass to atomically may be executed more than once. In most cases your code should be free of side-effects, except for via TVar.
If an exception escapes an atomically block it will abort the transaction.
It is undefined behaviour to use callcc or Fiber with atomically.
If you create a new thread within an atomically, it will not be part of the transaction. Creating a thread counts as a side-effect.
Transactions within transactions are flattened to a single transaction.
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/concurrent/tvar.rb', line 93 def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end |
+ (Boolean) c_extensions_loaded? Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) call_dataflow(method, executor, *inputs, &block)
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/concurrent/dataflow.rb', line 55 def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end |
+ (Logger) create_simple_logger(level = Logger::FATAL, output = $stderr)
Returns Logger with provided level and output.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/concurrent/configuration.rb', line 18 def self.create_simple_logger(level = Logger::FATAL, output = $stderr) # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking lambda do |severity, progname, = nil, &block| return false if severity < level = block ? block.call : = case when String when Exception format "%s (%s)\n%s", ., .class, (.backtrace || []).join("\n") else .inspect end output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), Logger::SEV_LABEL[severity], progname, true end end |
+ (Logger) create_stdlib_logger(level = Logger::FATAL, output = $stderr)
Returns Logger with provided level and output.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/concurrent/configuration.rb', line 50 def self.create_stdlib_logger(level = Logger::FATAL, output = $stderr) logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| = case msg when String msg when Exception format "%s (%s)\n%s", msg., msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, end lambda do |loglevel, progname, = nil, &block| logger.add loglevel, , progname, &block end end |
+ (Object) dataflow(*inputs) {|inputs| ... }
Dataflow allows you to create a task that will be scheduled when all of its data dependencies are available. Data dependencies are Future
values. The dataflow task itself is also a Future
value, so you can build up a graph of these tasks, each of which is run when all the data and other tasks it depends on are available or completed.
Our syntax is somewhat related to that of Akka's flow
and Habanero Java's DataDrivenFuture
. However unlike Akka we don't schedule a task at all until it is ready to run, and unlike Habanero Java we pass the data values into the task instead of dereferencing them again in the task.
The theory of dataflow goes back to the 70s. In the terminology of the literature, our implementation is coarse-grained, in that each task can be many instructions, and dynamic in that you can create more tasks within other tasks.
Example
A dataflow task is created with the dataflow
method, passing in a block.
task = Concurrent::dataflow { 14 }
This produces a simple Future
value. The task will run immediately, as it has no dependencies. We can also specify Future
values that must be available before a task will run. When we do this we get the value of those futures passed to our block.
a = Concurrent::dataflow { 1 }
b = Concurrent::dataflow { 2 }
c = Concurrent::dataflow(a, b) { |av, bv| av + bv }
Using the dataflow
method you can build up a directed acyclic graph (DAG) of tasks that depend on each other, and have the tasks run as soon as their dependencies are ready and there is CPU capacity to schedule them. This can help you create a program that uses more of the CPU resources available to you.
Derivation
This section describes how we could derive dataflow from other primitives in this library.
Consider a naive fibonacci calculator.
def fib(n)
if n < 2
n
else
fib(n - 1) + fib(n - 2)
end
end
puts fib(14) #=> 377
We could modify this to use futures.
def fib(n)
if n < 2
Concurrent::Future.new { n }
else
n1 = fib(n - 1).execute
n2 = fib(n - 2).execute
Concurrent::Future.new { n1.value + n2.value }
end
end
f = fib(14) #=> #<Concurrent::Future:0x000001019ef5a0 ...
f.execute #=> #<Concurrent::Future:0x000001019ef5a0 ...
sleep(0.5)
puts f.value #=> 377
One of the drawbacks of this approach is that all the futures start, and then most of them immediately block on their dependencies. We know that there's no point executing those futures until their dependencies are ready, so let's not execute each future until all their dependencies are ready.
To do this we'll create an object that counts the number of times it observes a future finishing before it does something - and for us that something will be to execute the next future.
class CountingObserver
def initialize(count, &block)
@count = count
@block = block
end
def update(time, value, reason)
@count -= 1
if @count <= 0
@block.call()
end
end
end
def fib(n)
if n < 2
Concurrent::Future.new { n }.execute
else
n1 = fib(n - 1)
n2 = fib(n - 2)
result = Concurrent::Future.new { n1.value + n2.value }
= CountingObserver.new(2) { result.execute }
n1.add_observer
n2.add_observer
n1.execute
n2.execute
result
end
end
We can wrap this up in a dataflow utility.
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)
puts f.value #=> 377
def dataflow(*inputs, &block)
result = Concurrent::Future.new(&block)
if inputs.empty?
result.execute
else
= CountingObserver.new(inputs.size) { result.execute }
inputs.each do |input|
input.add_observer
end
end
result
end
def fib(n)
if n < 2
dataflow { n }
else
n1 = fib(n - 1)
n2 = fib(n - 2)
dataflow(n1, n2) { n1.value + n2.value }
end
end
f = fib(14) #=> #<Concurrent::Future:0x00000101fca308 ...
sleep(0.5)
puts f.value #=> 377
Since we know that the futures the dataflow computation depends on are already going to be available when the future is executed, we might as well pass the values into the block so we don't have to reference the futures inside the block. This allows us to write the dataflow block as straight non-concurrent code without reference to futures.
def dataflow(*inputs, &block)
result = Concurrent::Future.new do
values = inputs.map { |input| input.value }
block.call(*values)
end
if inputs.empty?
result.execute
else
= CountingObserver.new(inputs.size) { result.execute }
inputs.each do |input|
input.add_observer
end
end
result
end
def fib(n)
if n < 2
Concurrent::dataflow { n }
else
n1 = fib(n - 1)
n2 = fib(n - 2)
Concurrent::dataflow(n1, n2) { |v1, v2| v1 + v2 }
end
end
f = fib(14) #=> #<Concurrent::Future:0x000001019a26d8 ...
sleep(0.5)
puts f.value #=> 377
33 34 35 |
# File 'lib/concurrent/dataflow.rb', line 33 def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end |
+ (undocumented) dataflow!(*inputs, &block)
43 44 45 |
# File 'lib/concurrent/dataflow.rb', line 43 def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end |
+ (undocumented) dataflow_with(executor, *inputs, &block)
38 39 40 |
# File 'lib/concurrent/dataflow.rb', line 38 def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end |
+ (undocumented) dataflow_with!(executor, *inputs, &block)
48 49 50 |
# File 'lib/concurrent/dataflow.rb', line 48 def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end |
+ (undocumented) disable_at_exit_handlers!
this option should be needed only because of at_exit
ordering
issues which may arise when running some of the testing frameworks.
E.g. Minitest's test-suite runs itself in at_exit
callback which
executes after the pools are already terminated. Then auto termination
needs to be disabled and called manually after test-suite ends.
This method should never be called from within a gem. It should only be used from within the main application and even then it should be used only when necessary.
Disables AtExit handlers including pool auto-termination handlers. When disabled it will be the application programmer's responsibility to ensure that the handlers are shutdown properly prior to application exit by calling AtExitImplementation.new.install.run method.
128 129 130 |
# File 'lib/concurrent/configuration.rb', line 128 def self.disable_at_exit_handlers! AtExit.enabled = false end |
+ (Executor) executor(executor_identifier)
General access point to global executors.
163 164 165 |
# File 'lib/concurrent/configuration.rb', line 163 def self.executor(executor_identifier) Options.executor(executor_identifier) end |
+ (ThreadPoolExecutor) global_fast_executor
Global thread pool optimized for short, fast operations.
135 136 137 |
# File 'lib/concurrent/configuration.rb', line 135 def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value end |
+ (undocumented) global_immediate_executor
146 147 148 |
# File 'lib/concurrent/configuration.rb', line 146 def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end |
+ (ThreadPoolExecutor) global_io_executor
Global thread pool optimized for long, blocking (IO) tasks.
142 143 144 |
# File 'lib/concurrent/configuration.rb', line 142 def self.global_io_executor GLOBAL_IO_EXECUTOR.value end |
+ (undocumented) global_logger
90 91 92 |
# File 'lib/concurrent/configuration.rb', line 90 def self.global_logger GLOBAL_LOGGER.value end |
+ (undocumented) global_logger=(value)
94 95 96 |
# File 'lib/concurrent/configuration.rb', line 94 def self.global_logger=(value) GLOBAL_LOGGER.value = value end |
+ (Concurrent::TimerSet) global_timer_set
Global thread pool user for global timers.
153 154 155 |
# File 'lib/concurrent/configuration.rb', line 153 def self.global_timer_set GLOBAL_TIMER_SET.value end |
+ (Boolean) java_extensions_loaded? Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) leave_transaction
Leave a transaction without committing or aborting - see Concurrent::atomically
.
155 156 157 |
# File 'lib/concurrent/tvar.rb', line 155 def leave_transaction raise Transaction::LeaveError.new end |
+ (undocumented) load_native_extensions Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) log(level, progname, message = nil, &block) Originally defined in module Concern::Logging
Logs through Concurrent.global_logger, it can be overridden by setting @logger
+ (Float) monotonic_time
Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).
Returns the current time a tracked by the application monotonic clock.
53 54 55 |
# File 'lib/concurrent/utility/monotonic_time.rb', line 53 def monotonic_time GLOBAL_MONOTONIC_CLOCK.get_time end |
+ (undocumented) new_fast_executor(opts = {})
167 168 169 170 171 172 173 174 175 |
# File 'lib/concurrent/configuration.rb', line 167 def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort # shouldn't matter -- 0 max queue ) end |
+ (undocumented) new_io_executor(opts = {})
177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/concurrent/configuration.rb', line 177 def self.new_io_executor(opts = {}) ThreadPoolExecutor.new( min_threads: [2, Concurrent.processor_count].max, max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE, # max_threads: 1000, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort # shouldn't matter -- 0 max queue ) end |
+ (Boolean) on_cruby? Originally defined in module Utility::EngineDetector
+ (Boolean) on_jruby? Originally defined in module Utility::EngineDetector
+ (Boolean) on_jruby_9000? Originally defined in module Utility::EngineDetector
+ (Boolean) on_linux? Originally defined in module Utility::EngineDetector
+ (Boolean) on_osx? Originally defined in module Utility::EngineDetector
+ (Boolean) on_rbx? Originally defined in module Utility::EngineDetector
+ (Boolean) on_truffle? Originally defined in module Utility::EngineDetector
+ (Boolean) on_windows? Originally defined in module Utility::EngineDetector
+ (undocumented) physical_processor_count
157 158 159 |
# File 'lib/concurrent/utility/processor_counter.rb', line 157 def self.physical_processor_count processor_counter.physical_processor_count end |
+ (undocumented) processor_count
153 154 155 |
# File 'lib/concurrent/utility/processor_counter.rb', line 153 def self.processor_count processor_counter.processor_count end |
+ (undocumented) ruby_engine Originally defined in module Utility::EngineDetector
+ (undocumented) ruby_version(comparison, major, minor, patch, version = RUBY_VERSION) Originally defined in module Utility::EngineDetector
+ (undocumented) set_c_extensions_loaded Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) set_java_extensions_loaded Originally defined in module Utility::NativeExtensionLoader
+ (undocumented) use_simple_logger(level = Logger::FATAL, output = $stderr)
Use logger created by #create_simple_logger to log concurrent-ruby messages.
44 45 46 |
# File 'lib/concurrent/configuration.rb', line 44 def self.use_simple_logger(level = Logger::FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end |
+ (undocumented) use_stdlib_logger(level = Logger::FATAL, output = $stderr)
Use logger created by #create_stdlib_logger to log concurrent-ruby messages.
77 78 79 |
# File 'lib/concurrent/configuration.rb', line 77 def self.use_stdlib_logger(level = Logger::FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end |