File: queue.rb

package info (click to toggle)
ruby-async 2.36.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 400 kB
  • sloc: ruby: 1,938; makefile: 4
file content (155 lines) | stat: -rw-r--r-- 4,629 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2025, by Samuel Williams.
# Copyright, 2019, by Ryan Musgrave.
# Copyright, 2020-2022, by Bruno Sutic.
# Copyright, 2025, by Jahfer Husain.
# Copyright, 2025, by Shopify Inc.

require_relative "notification"

module Async
	# A thread-safe queue which allows items to be processed in order.
	#
	# This implementation uses Thread::Queue internally for thread safety while
	# maintaining compatibility with the fiber scheduler.
	#
	# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
	#
	# @asynchronous This class is thread-safe.
	# @public Since *Async v1*.
	class Queue
		# An error raised when trying to enqueue items to a closed queue.
		# @public Since *Async v2.24*.
		class ClosedError < RuntimeError
		end
		
		# Create a new thread-safe queue.
		#
		# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
		def initialize(parent: nil, delegate: Thread::Queue.new)
			@delegate = delegate
			@parent = parent
		end
		
		# @returns [Boolean] Whether the queue is closed.
		def closed?
			@delegate.closed?
		end
		
		# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
		def close
			@delegate.close
		end
		
		# @returns [Integer] The number of items in the queue.
		def size
			@delegate.size
		end
		
		# @returns [Boolean] Whether the queue is empty.
		def empty?
			@delegate.empty?
		end
		
		# @returns [Integer] The number of tasks waiting for an item.
		def waiting_count
			@delegate.num_waiting
		end
		
		# Add an item to the queue.
		def push(item)
			@delegate.push(item)
		rescue ClosedQueueError
			raise ClosedError, "Cannot enqueue items to a closed queue!"
		end
		
		# Compatibility with {::Queue#push}.
		def <<(item)
			self.push(item)
		end
		
		# Add multiple items to the queue.
		def enqueue(*items)
			items.each{|item| @delegate.push(item)}
		rescue ClosedQueueError
			raise ClosedError, "Cannot enqueue items to a closed queue!"
		end
		
		# Remove and return the next item from the queue.
		# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
		# @returns [Object, nil] The dequeued item, or nil if timeout expires.
		def dequeue(timeout: nil)
			@delegate.pop(timeout: timeout)
		end
		
		# Compatibility with {::Queue#pop}.
		# @parameter timeout [Numeric, nil] Maximum time to wait for an item. If nil, waits indefinitely. If 0, returns immediately.
		# @returns [Object, nil] The dequeued item, or nil if timeout expires.
		def pop(timeout: nil)
			@delegate.pop(timeout: timeout)
		end
		
		# Process each item in the queue.
		#
		# @asynchronous Executes the given block concurrently for each item.
		#
		# @parameter arguments [Array] The arguments to pass to the block.
		# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
		# @parameter options [Hash] The options to pass to the task.
		# @yields {|task| ...} When the system is idle, the block will be executed in a new task.
		def async(parent: (@parent or Task.current), **options, &block)
			while item = self.dequeue
				parent.async(item, **options, &block)
			end
		end
		
		# Enumerate each item in the queue.
		def each
			while item = self.dequeue
				yield item
			end
		end
		
		# Signal the queue with a value, the same as {#enqueue}.
		def signal(value = nil)
			self.enqueue(value)
		end
		
		# Wait for an item to be available, the same as {#dequeue}.
		def wait
			self.dequeue
		end
	end
	
	# A thread-safe queue which limits the number of items that can be enqueued.
	#
	# @public Since *Async v1*.
	class LimitedQueue < Queue
		# @private This exists purely for emitting a warning.
		def self.new(...)
			warn("`require 'async/limited_queue'` to use `Async::LimitedQueue`.", uplevel: 1, category: :deprecated) if $VERBOSE
			
			super
		end
		
		# Create a new limited queue.
		#
		# @parameter limit [Integer] The maximum number of items that can be enqueued.
		# @parameter full [Notification] The notification to use for signaling when the queue is full. (ignored, for compatibility)
		def initialize(limit = 1, **options)
			super(**options, delegate: Thread::SizedQueue.new(limit))
		end
		
		# @attribute [Integer] The maximum number of items that can be enqueued.
		def limit
			@delegate.max
		end
		
		# @returns [Boolean] Whether trying to enqueue an item would block.
		def limited?
			!@delegate.closed? && @delegate.size >= @delegate.max
		end
	end
end