File: unbuffered.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (158 lines) | stat: -rw-r--r-- 5,136 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
require 'concurrent/constants'
require 'concurrent/channel/buffer/base'
require 'concurrent/atomic/atomic_reference'

module Concurrent
  class Channel
    module Buffer

      # A blocking buffer with a size of zero. An item can only be put onto
      # the buffer when a thread is waiting to take. Similarly, an item can
      # only be put onto the buffer when a thread is waiting to put. When
      # either {#put} or {#take} is called and there is no corresponding call
      # in progress, the call will block indefinitely. Any other calls to the
      # same method will queue behind the first call and block as well. As
      # soon as a corresponding put/take call is made an exchange will occur
      # and the first blocked call will return.
      class Unbuffered < Base

        # @!macro channel_buffer_size_reader
        def size
          synchronize do
            putting.empty? ? 0 : 1
          end
        end

        # @!macro channel_buffer_empty_question
        def empty?
          size == 0
        end

        # @!macro channel_buffer_full_question
        def full?
          !empty?
        end

        # @!macro channel_buffer_put
        #
        # Items can only be put onto the buffer when one or more threads are
        # waiting to {#take} items off the buffer. When there is a thread
        # waiting to take an item this method will give its item and return
        # immediately. When there are no threads waiting to take, this method
        # will block. As soon as a thread calls `take` the exchange will
        # occur and this method will return.
        def put(item)
          mine = synchronize do
            return false if ns_closed?

            ref = Concurrent::AtomicReference.new(item)
            if taking.empty?
              putting.push(ref)
            else
              taken = taking.shift
              taken.value = item
              ref.value = nil
            end
            ref
          end
          loop do
            return true if mine.value.nil?
            Thread.pass
          end
        end

        # @!macro channel_buffer_offer
        #
        # Items can only be put onto the buffer when one or more threads are
        # waiting to {#take} items off the buffer. When there is a thread
        # waiting to take an item this method will give its item and return
        # `true` immediately. When there are no threads waiting to take or the
        # buffer is closed, this method will return `false` immediately.
        def offer(item)
          synchronize do
            return false if ns_closed? || taking.empty?

            taken = taking.shift
            taken.value = item
            true
          end
        end

        # @!macro channel_buffer_take
        #
        # Items can only be taken from the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. When there is a thread
        # waiting to put an item this method will take that item and return it
        # immediately. When there are no threads waiting to put, this method
        # will block. As soon as a thread calls `pur` the exchange will occur
        # and this method will return.
        def take
          mine = synchronize do
            return Concurrent::NULL if ns_closed? && putting.empty?

            ref = Concurrent::AtomicReference.new(nil)
            if putting.empty?
              taking.push(ref)
            else
              put = putting.shift
              ref.value = put.value
              put.value = nil
            end
            ref
          end
          loop do
            item = mine.value
            return item if item
            Thread.pass
          end
        end

        # @!macro channel_buffer_poll
        #
        # Items can only be taken off the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. When there is a thread
        # waiting to put an item this method will take the item and return
        # it immediately. When there are no threads waiting to put or the
        # buffer is closed, this method will return `Concurrent::NULL` immediately.
        def poll
          synchronize do
            return Concurrent::NULL if putting.empty?

            put = putting.shift
            value = put.value
            put.value = nil
            value
          end
        end

        # @!macro channel_buffer_next
        #
        # Items can only be taken from the buffer when one or more threads are
        # waiting to {#put} items onto the buffer. This method exhibits the
        # same blocking behavior as {#take}.
        #
        # @see #take
        def next
          item = take
          more = (item != Concurrent::NULL)
          return item, more
        end

        private

        def putting() @putting; end

        def taking() @taking; end

        # @!macro channel_buffer_initialize
        def ns_initialize
          # one will always be empty
          @putting = []
          @taking = []
          self.closed = false
          self.capacity = 1
        end
      end
    end
  end
end