File: queues.rb

package info (click to toggle)
ruby-backports 3.25.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,912 kB
  • sloc: ruby: 11,759; makefile: 6
file content (66 lines) | stat: -rw-r--r-- 1,438 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# shareable_constant_value: literal

require_relative 'filtered_queue'

module Backports
  class Ractor
    # Standard ::Queue but raises if popping and closed
    class BaseQueue < FilteredQueue
      ClosedQueueError = Ractor::ClosedError

      # yields message (if any)
      def pop_non_blocking
        yield pop(timeout: 0)
      rescue TimeoutError
        nil
      end
    end

    class IncomingQueue < BaseQueue
      TYPE = :incoming

      protected def reenter
        raise Ractor::Error, 'Can not reenter'
      end
    end

    # * Wraps exception
    # * Add `ack: ` to push (blocking)
    class OutgoingQueue < BaseQueue
      TYPE = :outgoing

      WrappedException = ::Struct.new(:exception, :ractor)

      def initialize
        @ack_queue = ::Queue.new
        super
      end

      def pop(timeout: nil, ack: true)
        r = super(timeout: timeout)
        @ack_queue << :done if ack
        raise r.exception if WrappedException === r

        r
      end

      def close(how = :hard)
        super()
        return if how == :soft

        clear
        @ack_queue.close
      end

      def push(obj, ack:)
        super(obj)
        if ack
          r = @ack_queue.pop # block until popped
          raise ClosedError, "The #{self.class::TYPE}-port is already closed" unless r == :done
        end
        self
      end
    end
    private_constant :BaseQueue, :OutgoingQueue, :IncomingQueue
  end
end