File: channel.rb

package info (click to toggle)
ruby-concurrent 1.1.6%2Bdfsg-5
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 30,284 kB
  • sloc: ruby: 30,875; java: 6,117; javascript: 1,114; ansic: 288; makefile: 10; sh: 6
file content (450 lines) | stat: -rw-r--r-- 16,603 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# @!macro warn.edge
module Concurrent
  module Promises

    # A first in first out channel that accepts messages with push family of methods and returns
    # messages with pop family of methods.
    # Pop and push operations can be represented as futures, see {#pop_op} and {#push_op}.
    # The capacity of the channel can be limited to support back pressure, use capacity option in {#initialize}.
    # {#pop} method blocks ans {#pop_op} returns pending future if there is no message in the channel.
    # If the capacity is limited the {#push} method blocks and {#push_op} returns pending future.
    #
    # {include:file:docs-source/channel.out.md}
    # @!macro warn.edge
    class Channel < Concurrent::Synchronization::Object

      # TODO (pitr-ch 06-Jan-2019): rename to Conduit?, to be able to place it into Concurrent namespace?
      # TODO (pitr-ch 14-Jan-2019): better documentation, do few examples from go
      # TODO (pitr-ch 12-Dec-2018): implement channel closing,
      #   - as a child class? To also have a channel which cannot be closed.
      # TODO (pitr-ch 26-Dec-2016): replace with lock-free implementation, at least getting a message when available should be lock free same goes for push with space available

      # @!macro channel.warn.blocks
      #   @note This function potentially blocks current thread until it can continue.
      #     Be careful it can deadlock.
      #
      # @!macro channel.param.timeout
      #   @param [Numeric] timeout the maximum time in second to wait.

      safe_initialization!

      # Default capacity of the Channel, makes it accept unlimited number of messages.
      UNLIMITED_CAPACITY = ::Object.new
      UNLIMITED_CAPACITY.singleton_class.class_eval do
        include Comparable

        def <=>(other)
          1
        end

        def to_s
          'unlimited'
        end
      end

      NOTHING = Object.new
      private_constant :NOTHING

      # An object which matches anything (with #===)
      ANY = Object.new.tap do |any|
        def any.===(other)
          true
        end

        def any.to_s
          'ANY'
        end
      end

      # Create channel.
      # @param [Integer, UNLIMITED_CAPACITY] capacity the maximum number of messages which can be stored in the channel.
      def initialize(capacity = UNLIMITED_CAPACITY)
        super()
        @Capacity = capacity
        @Mutex    = Mutex.new
        # TODO (pitr-ch 28-Jan-2019): consider linked lists or other data structures for following attributes, things are being deleted from the middle
        @Probes      = []
        @Messages    = []
        @PendingPush = []
      end

      # Push the message into the channel if there is space available.
      # @param [Object] message
      # @return [true, false]
      def try_push(message)
        @Mutex.synchronize { ns_try_push(message) }
      end

      # Returns future which will fulfill when the message is pushed to the channel.
      # @!macro chanel.operation_wait
      #   If it is later waited on the operation with a timeout e.g.`channel.pop_op.wait(1)`
      #   it will not prevent the channel to fulfill the operation later after the timeout.
      #   The operation has to be either processed later
      #   ```ruby
      #   pop_op = channel.pop_op
      #   if pop_op.wait(1)
      #     process_message pop_op.value
      #   else
      #     pop_op.then { |message| log_unprocessed_message message }
      #   end
      #   ```
      #   or the operation can be prevented from completion after timing out by using
      #   `channel.pop_op.wait(1, [true, nil, nil])`.
      #   It will fulfill the operation on timeout preventing channel from doing the operation,
      #   e.g. popping a message.
      #
      # @param [Object] message
      # @return [ResolvableFuture(self)]
      def push_op(message)
        @Mutex.synchronize do
          if ns_try_push(message)
            Promises.fulfilled_future self
          else
            pushed = Promises.resolvable_future
            @PendingPush.push message, pushed
            return pushed
          end
        end
      end

      # Blocks current thread until the message is pushed into the channel.
      #
      # @!macro channel.warn.blocks
      # @param [Object] message
      # @!macro channel.param.timeout
      # @return [self, true, false] self implies timeout was not used, true implies timeout was used
      #   and it was pushed, false implies it was not pushed within timeout.
      def push(message, timeout = nil)
        pushed_op = @Mutex.synchronize do
          return timeout ? true : self if ns_try_push(message)

          pushed = Promises.resolvable_future
          # TODO (pitr-ch 06-Jan-2019): clear timed out pushes in @PendingPush, null messages
          @PendingPush.push message, pushed
          pushed
        end

        result = pushed_op.wait!(timeout, [true, self, nil])
        result == pushed_op ? self : result
      end

      # @!macro promises.channel.try_pop
      #   Pop a message from the channel if there is one available.
      #   @param [Object] no_value returned when there is no message available
      #   @return [Object, no_value] message or nil when there is no message
      def try_pop(no_value = nil)
        try_pop_matching ANY, no_value
      end

      # @!macro promises.channel.try_pop
      # @!macro promises.channel.param.matcher
      #   @param [#===] matcher only consider message which matches `matcher === a_message`
      def try_pop_matching(matcher, no_value = nil)
        @Mutex.synchronize do
          message = ns_shift_message matcher
          return message if message != NOTHING
          message = ns_consume_pending_push matcher
          return message != NOTHING ? message : no_value
        end
      end

      # @!macro promises.channel.pop_op
      #   Returns a future witch will become fulfilled with a value from the channel when one is available.
      #   @!macro chanel.operation_wait
      #
      #   @param [ResolvableFuture] probe the future which will be fulfilled with a channel value
      #   @return [Future(Object)] the probe, its value will be the message when available.
      def pop_op(probe = Promises.resolvable_future)
        @Mutex.synchronize { ns_pop_op(ANY, probe, false) }
      end

      # @!macro promises.channel.pop_op
      # @!macro promises.channel.param.matcher
      def pop_op_matching(matcher, probe = Promises.resolvable_future)
        @Mutex.synchronize { ns_pop_op(matcher, probe, false) }
      end

      # @!macro promises.channel.pop
      #   Blocks current thread until a message is available in the channel for popping.
      #
      #   @!macro channel.warn.blocks
      #   @!macro channel.param.timeout
      #   @param [Object] timeout_value a value returned by the method when it times out
      #   @return [Object, nil] message or nil when timed out
      def pop(timeout = nil, timeout_value = nil)
        pop_matching ANY, timeout, timeout_value
      end

      # @!macro promises.channel.pop
      # @!macro promises.channel.param.matcher
      def pop_matching(matcher, timeout = nil, timeout_value = nil)
        # TODO (pitr-ch 27-Jan-2019): should it try to match pending pushes if it fails to match in the buffer? Maybe only if the size is zero. It could be surprising if it's used as a throttle it might be expected that it will not pop if buffer is full of messages which di not match, it might it expected it will block until the message is added to the buffer
        # that it returns even if the buffer is full. User might expect that it has to be in the buffer first.
        probe = @Mutex.synchronize do
          message = ns_shift_message matcher
          if message == NOTHING
            message = ns_consume_pending_push matcher
            return message if message != NOTHING
          else
            new_message = ns_consume_pending_push ANY
            @Messages.push new_message unless new_message == NOTHING
            return message
          end

          probe = Promises.resolvable_future
          @Probes.push probe, false, matcher
          probe
        end

        probe.value!(timeout, timeout_value, [true, timeout_value, nil])
      end

      # @!macro promises.channel.peek
      #   Behaves as {#try_pop} but it does not remove the message from the channel
      #   @param [Object] no_value returned when there is no message available
      #   @return [Object, no_value] message or nil when there is no message
      def peek(no_value = nil)
        peek_matching ANY, no_value
      end

      # @!macro promises.channel.peek
      # @!macro promises.channel.param.matcher
      def peek_matching(matcher, no_value = nil)
        @Mutex.synchronize do
          message = ns_shift_message matcher, false
          return message if message != NOTHING
          message = ns_consume_pending_push matcher, false
          return message != NOTHING ? message : no_value
        end
      end

      # @!macro promises.channel.try_select
      #   If message is available in the receiver or any of the provided channels
      #   the channel message pair is returned. If there is no message nil is returned.
      #   The returned channel is the origin of the message.
      #
      #   @param [Channel, ::Array<Channel>] channels
      #   @return [::Array(Channel, Object), nil]
      #     pair [channel, message] if one of the channels is available for reading
      def try_select(channels)
        try_select_matching ANY, channels
      end

      # @!macro promises.channel.try_select
      # @!macro promises.channel.param.matcher
      def try_select_matching(matcher, channels)
        message = nil
        channel = [self, *channels].find do |ch|
          message = ch.try_pop_matching(matcher, NOTHING)
          message != NOTHING
        end
        channel ? [channel, message] : nil
      end

      # @!macro promises.channel.select_op
      #   When message is available in the receiver or any of the provided channels
      #   the future is fulfilled with a channel message pair.
      #   The returned channel is the origin of the message.
      #   @!macro chanel.operation_wait
      #
      #   @param [Channel, ::Array<Channel>] channels
      #   @param [ResolvableFuture] probe the future which will be fulfilled with the message
      #   @return [ResolvableFuture(::Array(Channel, Object))] a future which is fulfilled with
      #     pair [channel, message] when one of the channels is available for reading
      def select_op(channels, probe = Promises.resolvable_future)
        select_op_matching ANY, channels, probe
      end

      # @!macro promises.channel.select_op
      # @!macro promises.channel.param.matcher
      def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
        [self, *channels].each { |ch| ch.partial_select_op matcher, probe }
        probe
      end

      # @!macro promises.channel.select
      #   As {#select_op} but does not return future,
      #   it block current thread instead until there is a message available
      #   in the receiver or in any of the channels.
      #
      #   @!macro channel.warn.blocks
      #   @param [Channel, ::Array<Channel>] channels
      #   @!macro channel.param.timeout
      #   @return [::Array(Channel, Object), nil] message or nil when timed out
      #   @see #select_op
      def select(channels, timeout = nil)
        select_matching ANY, channels, timeout
      end

      # @!macro promises.channel.select
      # @!macro promises.channel.param.matcher
      def select_matching(matcher, channels, timeout = nil)
        probe = select_op_matching(matcher, channels)
        probe.value!(timeout, nil, [true, nil, nil])
      end

      # @return [Integer] The number of messages currently stored in the channel.
      def size
        @Mutex.synchronize { @Messages.size }
      end

      # @return [Integer] Maximum capacity of the Channel.
      def capacity
        @Capacity
      end

      # @return [String] Short string representation.
      def to_s
        format '%s capacity taken %s of %s>', super[0..-2], size, @Capacity
      end

      alias_method :inspect, :to_s

      class << self

        # @see #try_select
        # @return [::Array(Channel, Object)]
        def try_select(channels)
          channels.first.try_select(channels[1..-1])
        end

        # @see #select_op
        # @return [Future(::Array(Channel, Object))]
        def select_op(channels, probe = Promises.resolvable_future)
          channels.first.select_op(channels[1..-1], probe)
        end

        # @see #select
        # @return [::Array(Channel, Object), nil]
        def select(channels, timeout = nil)
          channels.first.select(channels[1..-1], timeout)
        end

        # @see #try_select_matching
        # @return [::Array(Channel, Object)]
        def try_select_matching(matcher, channels)
          channels.first.try_select_matching(matcher, channels[1..-1])
        end

        # @see #select_op_matching
        # @return [Future(::Array(Channel, Object))]
        def select_op_matching(matcher, channels, probe = Promises.resolvable_future)
          channels.first.select_op_matching(matcher, channels[1..-1], probe)
        end

        # @see #select_matching
        # @return [::Array(Channel, Object), nil]
        def select_matching(matcher, channels, timeout = nil)
          channels.first.select_matching(matcher, channels[1..-1], timeout)
        end
      end

      # @!visibility private
      def partial_select_op(matcher, probe)
        @Mutex.synchronize { ns_pop_op(matcher, probe, true) }
      end

      private

      def ns_pop_op(matcher, probe, include_channel)
        message = ns_shift_message matcher

        # got message from buffer
        if message != NOTHING
          if probe.fulfill(include_channel ? [self, message] : message, false)
            new_message = ns_consume_pending_push ANY
            @Messages.push new_message unless new_message == NOTHING
          else
            @Messages.unshift message
          end
          return probe
        end

        # no message in buffer, try to pair with a pending push
        i = 0
        while true
          message, pushed = @PendingPush[i, 2]
          break if pushed.nil?

          if matcher === message
            value = include_channel ? [self, message] : message
            if Promises::Resolvable.atomic_resolution(probe  => [true, value, nil],
                                                      pushed => [true, self, nil])
              @PendingPush[i, 2] = []
              return probe
            end

            if probe.resolved?
              return probe
            end

            # so pushed.resolved? has to be true, remove the push
            @PendingPush[i, 2] = []
          end

          i += 2
        end

        # no push to pair with
        # TODO (pitr-ch 11-Jan-2019): clear up probes when timed out, use callback
        @Probes.push probe, include_channel, matcher if probe.pending?
        return probe
      end

      def ns_consume_pending_push(matcher, remove = true)
        i = 0
        while true
          message, pushed = @PendingPush[i, 2]
          return NOTHING unless pushed

          if matcher === message
            resolved           = pushed.resolved?
            @PendingPush[i, 2] = [] if remove || resolved
            # can fail if timed-out, so try without error
            if remove ? pushed.fulfill(self, false) : !resolved
              # pushed fulfilled so actually push the message
              return message
            end
          end

          i += 2
        end
      end

      def ns_try_push(message)
        i = 0
        while true
          probe, include_channel, matcher = @Probes[i, 3]
          break unless probe
          if matcher === message && probe.fulfill(include_channel ? [self, message] : message, false)
            @Probes[i, 3] = []
            return true
          end
          i += 3
        end

        if @Capacity > @Messages.size
          @Messages.push message
          true
        else
          false
        end
      end

      def ns_shift_message(matcher, remove = true)
        i = 0
        while true
          message = @Messages.fetch(i, NOTHING)
          return NOTHING if message == NOTHING

          if matcher === message
            @Messages.delete_at i if remove
            return message
          end

          i += 1
        end
      end
    end
  end
end