
|
require 'set'
require 'concurrent/synchronization'
module Concurrent
# A `TVar` is a transactional variable - a single-element container that
# is used as part of a transaction - see `Concurrent::atomically`.
#
# @!macro thread_safe_variable_comparison
#
# {include:file:docs-source/tvar.md}
class TVar < Synchronization::Object
safe_initialization!
# Create a new `TVar` with an initial value.
def initialize(value)
@value = value
@version = 0
@lock = Mutex.new
end
# Get the value of a `TVar`.
def value
Concurrent::atomically do
Transaction::current.read(self)
end
end
# Set the value of a `TVar`.
def value=(value)
Concurrent::atomically do
Transaction::current.write(self, value)
end
end
# @!visibility private
def unsafe_value # :nodoc:
@value
end
# @!visibility private
def unsafe_value=(value) # :nodoc:
@value = value
end
# @!visibility private
def unsafe_version # :nodoc:
@version
end
# @!visibility private
def unsafe_increment_version # :nodoc:
@version += 1
end
# @!visibility private
def unsafe_lock # :nodoc:
@lock
end
end
# Run a block that reads and writes `TVar`s as a single atomic transaction.
# With respect to the value of `TVar` objects, the transaction is atomic, in
# that it either happens or it does not, consistent, in that the `TVar`
# objects involved will never enter an illegal state, and isolated, in that
# transactions never interfere with each other. You may recognise these
# properties from database transactions.
#
# There are some very important and unusual semantics that you must be aware of:
#
# * Most importantly, the block that you pass to atomically may be executed
# more than once. In most cases your code should be free of
# side-effects, except for via TVar.
#
# * If an exception escapes an atomically block it will abort the transaction.
#
# * It is undefined behaviour to use callcc or Fiber with atomically.
#
# * If you create a new thread within an atomically, it will not be part of
# the transaction. Creating a thread counts as a side-effect.
#
# Transactions within transactions are flattened to a single transaction.
#
# @example
# a = new TVar(100_000)
# b = new TVar(100)
#
# Concurrent::atomically do
# a.value -= 10
# b.value += 10
# end
def atomically
raise ArgumentError.new('no block given') unless block_given?
# Get the current transaction
transaction = Transaction::current
# Are we not already in a transaction (not nested)?
if transaction.nil?
# New transaction
begin
# Retry loop
loop do
# Create a new transaction
transaction = Transaction.new
Transaction::current = transaction
# Run the block, aborting on exceptions
begin
result = yield
rescue Transaction::AbortError => e
transaction.abort
result = Transaction::ABORTED
rescue Transaction::LeaveError => e
transaction.abort
break result
rescue => e
transaction.abort
raise e
end
# If we can commit, break out of the loop
if result != Transaction::ABORTED
if transaction.commit
break result
end
end
end
ensure
# Clear the current transaction
Transaction::current = nil
end
else
# Nested transaction - flatten it and just run the block
yield
end
end
# Abort a currently running transaction - see `Concurrent::atomically`.
def abort_transaction
raise Transaction::AbortError.new
end
# Leave a transaction without committing or aborting - see `Concurrent::atomically`.
def leave_transaction
raise Transaction::LeaveError.new
end
module_function :atomically, :abort_transaction, :leave_transaction
private
class Transaction
ABORTED = ::Object.new
ReadLogEntry = Struct.new(:tvar, :version)
AbortError = Class.new(StandardError)
LeaveError = Class.new(StandardError)
def initialize
@read_log = []
@write_log = {}
end
def read(tvar)
Concurrent::abort_transaction unless valid?
if @write_log.has_key? tvar
@write_log[tvar]
else
@read_log.push(ReadLogEntry.new(tvar, tvar.unsafe_version))
tvar.unsafe_value
end
end
def write(tvar, value)
# Have we already written to this TVar?
unless @write_log.has_key? tvar
# Try to lock the TVar
unless tvar.unsafe_lock.try_lock
# Someone else is writing to this TVar - abort
Concurrent::abort_transaction
end
# If we previously wrote to it, check the version hasn't changed
@read_log.each do |log_entry|
if log_entry.tvar == tvar and tvar.unsafe_version > log_entry.version
Concurrent::abort_transaction
end
end
end
# Record the value written
@write_log[tvar] = value
end
def abort
unlock
end
def commit
return false unless valid?
@write_log.each_pair do |tvar, value|
tvar.unsafe_value = value
tvar.unsafe_increment_version
end
unlock
true
end
def valid?
@read_log.each do |log_entry|
unless @write_log.has_key? log_entry.tvar
if log_entry.tvar.unsafe_version > log_entry.version
return false
end
end
end
true
end
def unlock
@write_log.each_key do |tvar|
tvar.unsafe_lock.unlock
end
end
def self.current
Thread.current[:current_tvar_transaction]
end
def self.current=(transaction)
Thread.current[:current_tvar_transaction] = transaction
end
end
end
|