File: ruby_timeout_queue.rb

package info (click to toggle)
ruby-concurrent 1.3.6-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,152 kB
  • sloc: ruby: 30,953; java: 6,128; ansic: 293; makefile: 26; sh: 19
file content (55 lines) | stat: -rw-r--r-- 1,388 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
module Concurrent
  module Collection
    # @!visibility private
    # @!macro ruby_timeout_queue
    class RubyTimeoutQueue < ::Queue
      def initialize(*args)
        if RUBY_VERSION >= '3.2'
          raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
        end

        super(*args)

        @mutex = Mutex.new
        @cond_var = ConditionVariable.new
      end

      def push(obj)
        @mutex.synchronize do
          super(obj)
          @cond_var.signal
        end
      end
      alias_method :enq, :push
      alias_method :<<, :push

      def pop(non_block = false, timeout: nil)
        if non_block && timeout
          raise ArgumentError, "can't set a timeout if non_block is enabled"
        end

        if non_block
          super(true)
        elsif timeout
          @mutex.synchronize do
            deadline = Concurrent.monotonic_time + timeout
            while (now = Concurrent.monotonic_time) < deadline && empty?
              @cond_var.wait(@mutex, deadline - now)
            end
            begin
              return super(true)
            rescue ThreadError
              # still empty
              nil
            end
          end
        else
          super(false)
        end
      end
      alias_method :deq, :pop
      alias_method :shift, :pop
    end
    private_constant :RubyTimeoutQueue
  end
end