1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
|
require 'set'
require 'concurrent/synchronization'
module Concurrent
# A `TVar` is a transactional variable - a single-element container that
# is used as part of a transaction - see `Concurrent::atomically`.
#
# @!macro thread_safe_variable_comparison
#
# {include:file:docs-source/tvar.md}
class TVar < Synchronization::Object
safe_initialization!
# Create a new `TVar` with an initial value.
def initialize(value)
@value = value
@version = 0
@lock = Mutex.new
end
# Get the value of a `TVar`.
def value
Concurrent::atomically do
Transaction::current.read(self)
end
end
# Set the value of a `TVar`.
def value=(value)
Concurrent::atomically do
Transaction::current.write(self, value)
end
end
# @!visibility private
def unsafe_value # :nodoc:
@value
end
# @!visibility private
def unsafe_value=(value) # :nodoc:
@value = value
end
# @!visibility private
def unsafe_version # :nodoc:
@version
end
# @!visibility private
def unsafe_increment_version # :nodoc:
@version += 1
end
# @!visibility private
def unsafe_lock # :nodoc:
@lock
end
end
# Run a block that reads and writes `TVar`s as a single atomic transaction.
# With respect to the value of `TVar` objects, the transaction is atomic, in
# that it either happens or it does not, consistent, in that the `TVar`
# objects involved will never enter an illegal state, and isolated, in that
# transactions never interfere with each other. You may recognise these
# properties from database transactions.
#
# There are some very important and unusual semantics that you must be aware of:
#
# * Most importantly, the block that you pass to atomically may be executed
# more than once. In most cases your code should be free of
# side-effects, except for via TVar.
#
# * If an exception escapes an atomically block it will abort the transaction.
#
# * It is undefined behaviour to use callcc or Fiber with atomically.
#
# * If you create a new thread within an atomically, it will not be part of
# the transaction. Creating a thread counts as a side-effect.
#
# Transactions within transactions are flattened to a single transaction.
#
# @example
# a = new TVar(100_000)
# b = new TVar(100)
#
# Concurrent::atomically do
# a.value -= 10
# b.value += 10
# end
def atomically
raise ArgumentError.new('no block given') unless block_given?
# Get the current transaction
transaction = Transaction::current
# Are we not already in a transaction (not nested)?
if transaction.nil?
# New transaction
begin
# Retry loop
loop do
# Create a new transaction
transaction = Transaction.new
Transaction::current = transaction
# Run the block, aborting on exceptions
begin
result = yield
rescue Transaction::AbortError => e
transaction.abort
result = Transaction::ABORTED
rescue Transaction::LeaveError => e
transaction.abort
break result
rescue => e
transaction.abort
raise e
end
# If we can commit, break out of the loop
if result != Transaction::ABORTED
if transaction.commit
break result
end
end
end
ensure
# Clear the current transaction
Transaction::current = nil
end
else
# Nested transaction - flatten it and just run the block
yield
end
end
# Abort a currently running transaction - see `Concurrent::atomically`.
def abort_transaction
raise Transaction::AbortError.new
end
# Leave a transaction without committing or aborting - see `Concurrent::atomically`.
def leave_transaction
raise Transaction::LeaveError.new
end
module_function :atomically, :abort_transaction, :leave_transaction
private
class Transaction
ABORTED = ::Object.new
ReadLogEntry = Struct.new(:tvar, :version)
AbortError = Class.new(StandardError)
LeaveError = Class.new(StandardError)
def initialize
@read_log = []
@write_log = {}
end
def read(tvar)
Concurrent::abort_transaction unless valid?
if @write_log.has_key? tvar
@write_log[tvar]
else
@read_log.push(ReadLogEntry.new(tvar, tvar.unsafe_version))
tvar.unsafe_value
end
end
def write(tvar, value)
# Have we already written to this TVar?
unless @write_log.has_key? tvar
# Try to lock the TVar
unless tvar.unsafe_lock.try_lock
# Someone else is writing to this TVar - abort
Concurrent::abort_transaction
end
# If we previously wrote to it, check the version hasn't changed
@read_log.each do |log_entry|
if log_entry.tvar == tvar and tvar.unsafe_version > log_entry.version
Concurrent::abort_transaction
end
end
end
# Record the value written
@write_log[tvar] = value
end
def abort
unlock
end
def commit
return false unless valid?
@write_log.each_pair do |tvar, value|
tvar.unsafe_value = value
tvar.unsafe_increment_version
end
unlock
true
end
def valid?
@read_log.each do |log_entry|
unless @write_log.has_key? log_entry.tvar
if log_entry.tvar.unsafe_version > log_entry.version
return false
end
end
end
true
end
def unlock
@write_log.each_key do |tvar|
tvar.unsafe_lock.unlock
end
end
def self.current
Thread.current[:current_tvar_transaction]
end
def self.current=(transaction)
Thread.current[:current_tvar_transaction] = transaction
end
end
end
|