1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
# shareable_constant_value: literal
require_relative 'filtered_queue'
module Backports
class Ractor
# Standard ::Queue but raises if popping and closed
class BaseQueue < FilteredQueue
ClosedQueueError = Ractor::ClosedError
# yields message (if any)
def pop_non_blocking
yield pop(timeout: 0)
rescue TimeoutError
nil
end
end
class IncomingQueue < BaseQueue
TYPE = :incoming
protected def reenter
raise Ractor::Error, 'Can not reenter'
end
end
# * Wraps exception
# * Add `ack: ` to push (blocking)
class OutgoingQueue < BaseQueue
TYPE = :outgoing
WrappedException = ::Struct.new(:exception, :ractor)
def initialize
@ack_queue = ::Queue.new
super
end
def pop(timeout: nil, ack: true)
r = super(timeout: timeout)
@ack_queue << :done if ack
raise r.exception if WrappedException === r
r
end
def close(how = :hard)
super()
return if how == :soft
clear
@ack_queue.close
end
def push(obj, ack:)
super(obj)
if ack
r = @ack_queue.pop # block until popped
raise ClosedError, "The #{self.class::TYPE}-port is already closed" unless r == :done
end
self
end
end
private_constant :BaseQueue, :OutgoingQueue, :IncomingQueue
end
end
|