1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
require 'fiber'
require 'concurrent/utility/engine'
require 'concurrent/constants'
module Concurrent
# @!visibility private
# @!macro internal_implementation_note
#
# An abstract implementation of local storage, with sub-classes for
# per-thread and per-fiber locals.
#
# Each execution context (EC, thread or fiber) has a lazily initialized array
# of local variable values. Each time a new local variable is created, we
# allocate an "index" for it.
#
# For example, if the allocated index is 1, that means slot #1 in EVERY EC's
# locals array will be used for the value of that variable.
#
# The good thing about using a per-EC structure to hold values, rather than
# a global, is that no synchronization is needed when reading and writing
# those values (since the structure is only ever accessed by a single
# thread).
#
# Of course, when a local variable is GC'd, 1) we need to recover its index
# for use by other new local variables (otherwise the locals arrays could
# get bigger and bigger with time), and 2) we need to null out all the
# references held in the now-unused slots (both to avoid blocking GC of those
# objects, and also to prevent "stale" values from being passed on to a new
# local when the index is reused).
#
# Because we need to null out freed slots, we need to keep references to
# ALL the locals arrays, so we can null out the appropriate slots in all of
# them. This is why we need to use a finalizer to clean up the locals array
# when the EC goes out of scope.
class AbstractLocals
def initialize
@free = []
@lock = Mutex.new
@all_arrays = {}
@next = 0
end
def synchronize
@lock.synchronize { yield }
end
if Concurrent.on_cruby?
def weak_synchronize
yield
end
else
alias_method :weak_synchronize, :synchronize
end
def next_index(local)
index = synchronize do
if @free.empty?
@next += 1
else
@free.pop
end
end
# When the local goes out of scope, we should free the associated index
# and all values stored into it.
ObjectSpace.define_finalizer(local, local_finalizer(index))
index
end
def free_index(index)
weak_synchronize do
# The cost of GC'ing a TLV is linear in the number of ECs using local
# variables. But that is natural! More ECs means more storage is used
# per local variable. So naturally more CPU time is required to free
# more storage.
#
# DO NOT use each_value which might conflict with new pair assignment
# into the hash in #set method.
@all_arrays.values.each do |locals|
locals[index] = nil
end
# free index has to be published after the arrays are cleared:
@free << index
end
end
def fetch(index)
locals = self.locals
value = locals ? locals[index] : nil
if nil == value
yield
elsif NULL.equal?(value)
nil
else
value
end
end
def set(index, value)
locals = self.locals!
locals[index] = (nil == value ? NULL : value)
value
end
private
# When the local goes out of scope, clean up that slot across all locals currently assigned.
def local_finalizer(index)
proc do
free_index(index)
end
end
# When a thread/fiber goes out of scope, remove the array from @all_arrays.
def thread_fiber_finalizer(array_object_id)
proc do
weak_synchronize do
@all_arrays.delete(array_object_id)
end
end
end
# Returns the locals for the current scope, or nil if none exist.
def locals
raise NotImplementedError
end
# Returns the locals for the current scope, creating them if necessary.
def locals!
raise NotImplementedError
end
end
# @!visibility private
# @!macro internal_implementation_note
# An array-backed storage of indexed variables per thread.
class ThreadLocals < AbstractLocals
def locals
Thread.current.thread_variable_get(:concurrent_thread_locals)
end
def locals!
thread = Thread.current
locals = thread.thread_variable_get(:concurrent_thread_locals)
unless locals
locals = thread.thread_variable_set(:concurrent_thread_locals, [])
weak_synchronize do
@all_arrays[locals.object_id] = locals
end
# When the thread goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(thread, thread_fiber_finalizer(locals.object_id))
end
locals
end
end
# @!visibility private
# @!macro internal_implementation_note
# An array-backed storage of indexed variables per fiber.
class FiberLocals < AbstractLocals
def locals
Thread.current[:concurrent_fiber_locals]
end
def locals!
thread = Thread.current
locals = thread[:concurrent_fiber_locals]
unless locals
locals = thread[:concurrent_fiber_locals] = []
weak_synchronize do
@all_arrays[locals.object_id] = locals
end
# When the fiber goes out of scope, we should delete the associated locals:
ObjectSpace.define_finalizer(Fiber.current, thread_fiber_finalizer(locals.object_id))
end
locals
end
end
private_constant :AbstractLocals, :ThreadLocals, :FiberLocals
end
|