1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
|
# shareable_constant_value: literal
# Ruby 2.0+ backport of `Ractor` class
# Extra private methods and instance variables all start with `ractor_`
module Backports
class Ractor
require_relative '../tools/arguments'
require_relative 'cloner'
require_relative 'errors'
require_relative 'queues'
require_relative 'sharing'
RactorThreadGroups = ::ObjectSpace::WeakMap.new # ThreadGroup => Ractor
private_constant :RactorThreadGroups
# Implementation notes
#
# Uses one `Thread` for each `Ractor`, as well as queues for communication
#
# The incoming queue is strict: contrary to standard queue, you can't pop from an empty closed queue.
# Since standard queues return `nil` is those conditions, we wrap/unwrap `nil` values and consider
# all `nil` values to be results of closed queues. `ClosedQueueError` are re-raised as `Ractor::ClosedError`
#
# The outgoing queue is strict and blocking. Same wrapping / raising as incoming,
# with an extra queue to acknowledge when a value has been read (or if the port is closed while waiting).
#
# The last result is a bit tricky as it needs to be pushed on the outgoing queue but can not be blocking.
# For this, we "soft close" the outgoing port.
def initialize(*args, &block)
@ractor_incoming_queue = IncomingQueue.new
@ractor_outgoing_queue = OutgoingQueue.new
raise ::ArgumentError, 'must be called with a block' unless block
kw = args.last
if kw.is_a?(::Hash) && kw.size == 1 && kw.key?(:name)
args.pop
name = kw[:name]
end
@ractor_name = name && Backports.coerce_to_str(name)
@id = Ractor.ractor_next_id
if Ractor.main == nil # then initializing main Ractor
@ractor_thread = ::Thread.current
@ractor_origin = nil
@ractor_thread.thread_variable_set(:backports_ractor, self)
else
@ractor_origin = caller(1, 1).first.split(':in `').first
args.map! { |a| Ractor.ractor_isolate(a, false) }
ractor_thread_start(args, block)
end
end
private def ractor_thread_start(args, block)
::Thread.new do
@ractor_thread = ::Thread.current
@ractor_thread_group = ::ThreadGroup.new
RactorThreadGroups[@ractor_thread_group] = self
@ractor_thread_group.add(@ractor_thread)
::Thread.current.thread_variable_set(:backports_ractor, self)
result = nil
begin
result = instance_exec(*args, &block)
rescue ::Exception => err # rubocop:disable Lint/RescueException
begin
raise RemoteError, "thrown by remote Ractor: #{err.message}"
rescue RemoteError => e # Hack to create exception with `cause`
result = OutgoingQueue::WrappedException.new(e)
end
ensure
ractor_thread_terminate(result)
end
end
end
private def ractor_thread_terminate(result)
begin
ractor_outgoing_queue.push(result, ack: false) unless ractor_outgoing_queue.closed?
rescue ::ClosedQueueError
return # ignore
end
ractor_incoming_queue.close
ractor_outgoing_queue.close(:soft)
ensure
# TODO: synchronize?
@ractor_thread_group.list.each do |thread|
thread.kill unless thread == Thread.current
end
end
def send(obj, move: false)
ractor_incoming_queue << Ractor.ractor_isolate(obj, move)
self
rescue ::ClosedQueueError
raise ClosedError, 'The incoming-port is already closed'
end
alias_method :<<, :send
def take
ractor_outgoing_queue.pop(ack: true)
end
def name
@ractor_name
end
RACTOR_STATE = {
'sleep' => 'blocking',
'run' => 'running',
'aborting' => 'aborting',
false => 'terminated',
nil => 'terminated',
}.freeze
private_constant :RACTOR_STATE
def inspect
state = RACTOR_STATE[@ractor_thread ? @ractor_thread.status : 'run']
info = [
"Ractor:##{@id}",
name,
@ractor_origin,
state,
].compact.join(' ')
"#<#{info}>"
end
def close_incoming
r = ractor_incoming_queue.closed?
ractor_incoming_queue.close
r
end
def close_outgoing
r = ractor_outgoing_queue.closed?
ractor_outgoing_queue.close
r
end
private def receive
ractor_incoming_queue.pop
end
private def receive_if(&block)
raise ::ArgumentError, 'no block given' unless block
ractor_incoming_queue.pop(&block)
end
def [](key)
Ractor.current.ractor_locals[key]
end
def []=(key, value)
Ractor.current.ractor_locals[key] = value
end
# @api private
def ractor_locals
@ractor_locals ||= {}.compare_by_identity
end
class << self
def yield(value, move: false)
value = ractor_isolate(value, move)
current.ractor_outgoing_queue.push(value, ack: true)
rescue ::ClosedQueueError
raise ClosedError, 'The outgoing-port is already closed'
end
def receive
current.__send__(:receive)
end
alias_method :recv, :receive
def receive_if(&block)
current.__send__(:receive_if, &block)
end
def select(*ractors, yield_value: not_given = true, move: false)
cur = Ractor.current
queues = ractors.map do |r|
r == cur ? r.ractor_incoming_queue : r.ractor_outgoing_queue
end
if !not_given
out = current.ractor_outgoing_queue
yield_value = ractor_isolate(yield_value, move)
elsif ractors.empty?
raise ::ArgumentError, 'specify at least one ractor or `yield_value`'
end
while true # rubocop:disable Style/InfiniteLoop
# Don't `loop`, in case of `ClosedError` (not that there should be any)
queues.each_with_index do |q, i|
q.pop_non_blocking do |val|
r = ractors[i]
return [r == cur ? :receive : r, val]
end
end
if out && out.num_waiting > 0
# Not quite atomic...
out.push(yield_value, ack: true)
return [:yield, nil]
end
sleep(0.001)
end
end
def make_shareable(obj)
return obj if ractor_check_shareability?(obj, true)
raise Ractor::Error, '#freeze does not freeze object correctly'
end
def shareable?(obj)
ractor_check_shareability?(obj, false)
end
def current
::Thread.current.thread_variable_get(:backports_ractor) ||
::Thread.current.thread_variable_set(:backports_ractor, ractor_find_current)
end
def count
::ObjectSpace.each_object(Ractor).count(&:ractor_live?)
end
# @api private
def ractor_reset
::ObjectSpace.each_object(Ractor).each do |r|
next if r == Ractor.current
next unless (th = r.ractor_thread)
th.kill
th.join
end
Ractor.current.ractor_incoming_queue.clear
end
# @api private
def ractor_next_id
@id ||= 0
@id += 1
end
attr_reader :main
private def ractor_init
@ractor_shareable = ::ObjectSpace::WeakMap.new
@main = Ractor.new { nil }
RactorThreadGroups[::ThreadGroup::Default] = @main
end
private def ractor_find_current
RactorThreadGroups[Thread.current.group]
end
end
# @api private
def ractor_live?
!defined?(@ractor_thread) || # May happen if `count` is called from another thread before `initialize` has completed
@ractor_thread.status
end
# @api private
attr_reader :ractor_outgoing_queue, :ractor_incoming_queue, :ractor_thread
ractor_init
end
end
|