File: ruby_thread_local_var.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (171 lines) | stat: -rw-r--r-- 5,308 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
require 'thread'
require 'concurrent/atomic/abstract_thread_local_var'

module Concurrent

  # @!visibility private
  # @!macro internal_implementation_note
  class RubyThreadLocalVar < AbstractThreadLocalVar

    # Each thread has a (lazily initialized) array of thread-local variable values
    # Each time a new thread-local var is created, we allocate an "index" for it
    # For example, if the allocated index is 1, that means slot #1 in EVERY
    #   thread's thread-local array will be used for the value of that TLV
    #
    # The good thing about using a per-THREAD structure to hold values, rather
    #   than a per-TLV structure, is that no synchronization is needed when
    #    reading and writing those values (since the structure is only ever
    #    accessed by a single thread)
    #
    # Of course, when a TLV is GC'd, 1) we need to recover its index for use
    #   by other new TLVs (otherwise the thread-local arrays could get bigger
    #   and bigger with time), and 2) we need to null out all the references
    #   held in the now-unused slots (both to avoid blocking GC of those objects,
    #   and also to prevent "stale" values from being passed on to a new TLV
    #   when the index is reused)
    # Because we need to null out freed slots, we need to keep references to
    #   ALL the thread-local arrays -- ARRAYS is for that
    # But when a Thread is GC'd, we need to drop the reference to its thread-local
    #   array, so we don't leak memory

    # @!visibility private
    FREE   = []
    LOCK   = Mutex.new
    ARRAYS = {} # used as a hash set
    # noinspection RubyClassVariableUsageInspection
    @@next = 0
    QUEUE  = Queue.new
    THREAD = Thread.new do
      while true
        method, i = QUEUE.pop
        case method
        when :thread_local_finalizer
          LOCK.synchronize do
            FREE.push(i)
            # The cost of GC'ing a TLV is linear in the number of threads using TLVs
            # But that is natural! More threads means more storage is used per TLV
            # So naturally more CPU time is required to free more storage
            ARRAYS.each_value do |array|
              array[i] = nil
            end
          end
        when :thread_finalizer
          LOCK.synchronize do
            # The thread which used this thread-local array is now gone
            # So don't hold onto a reference to the array (thus blocking GC)
            ARRAYS.delete(i)
          end
        end
      end
    end

    private_constant :FREE, :LOCK, :ARRAYS, :QUEUE, :THREAD

    # @!macro thread_local_var_method_get
    def value
      if (array = get_threadlocal_array)
        value = array[@index]
        if value.nil?
          default
        elsif value.equal?(NULL)
          nil
        else
          value
        end
      else
        default
      end
    end

    # @!macro thread_local_var_method_set
    def value=(value)
      me = Thread.current
      # We could keep the thread-local arrays in a hash, keyed by Thread
      # But why? That would require locking
      # Using Ruby's built-in thread-local storage is faster
      unless (array = get_threadlocal_array(me))
        array = set_threadlocal_array([], me)
        LOCK.synchronize { ARRAYS[array.object_id] = array }
        ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array.object_id))
      end
      array[@index] = (value.nil? ? NULL : value)
      value
    end

    protected

    # @!visibility private
    # noinspection RubyClassVariableUsageInspection
    def allocate_storage
      @index = LOCK.synchronize do
        FREE.pop || begin
          result = @@next
          @@next += 1
          result
        end
      end
      ObjectSpace.define_finalizer(self, self.class.thread_local_finalizer(@index))
    end

    # @!visibility private
    def self.thread_local_finalizer(index)
      # avoid error: can't be called from trap context
      proc { QUEUE.push [:thread_local_finalizer, index] }
    end

    # @!visibility private
    def self.thread_finalizer(id)
      # avoid error: can't be called from trap context
      proc { QUEUE.push [:thread_finalizer, id] }
    end

    private

    if Thread.instance_methods.include?(:thread_variable_get)

      def get_threadlocal_array(thread = Thread.current)
        thread.thread_variable_get(:__threadlocal_array__)
      end

      def set_threadlocal_array(array, thread = Thread.current)
        thread.thread_variable_set(:__threadlocal_array__, array)
      end

    else

      def get_threadlocal_array(thread = Thread.current)
        thread[:__threadlocal_array__]
      end

      def set_threadlocal_array(array, thread = Thread.current)
        thread[:__threadlocal_array__] = array
      end
    end

    # This exists only for use in testing
    # @!visibility private
    def value_for(thread)
      if (array = get_threadlocal_array(thread))
        value = array[@index]
        if value.nil?
          get_default
        elsif value.equal?(NULL)
          nil
        else
          value
        end
      else
        get_default
      end
    end

    # @!visibility private
    def get_default
      if @default_block
        raise "Cannot use default_for with default block"
      else
        @default
      end
    end
  end
end