File: controller.rb

package info (click to toggle)
ruby-async-pool 0.3.12-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 188 kB
  • sloc: ruby: 535; makefile: 4
file content (305 lines) | stat: -rw-r--r-- 6,884 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2019-2022, by Samuel Williams.
# Copyright, 2020, by Simon Perepelitsa.

require 'console/logger'

require 'async'
require 'async/notification'
require 'async/semaphore'

module Async
	module Pool
		class Controller
			def self.wrap(**options, &block)
				self.new(block, **options)
			end
			
			def initialize(constructor, limit: nil, concurrency: nil)
				# All available resources:
				@resources = {}
				
				# Resources which may be available to be acquired:
				# This list may contain false positives, or resources which were okay but have since entered a state which is unusuable.
				@available = []
				
				@notification = Async::Notification.new
				
				@limit = limit
				
				@constructor = constructor
				
				# Set the concurrency to be the same as the limit for maximum performance:
				if limit
					concurrency ||= limit
				else
					concurrency ||= 1
				end
				
				@guard = Async::Semaphore.new(concurrency)
				
				@gardener = nil
			end
			
			# @attribute [Hash(Resource, Integer)] all allocated resources, and their associated usage.
			attr :resources
			
			def size
				@resources.size
			end
			
			# Whether the pool has any active resources.
			def active?
				!@resources.empty?
			end
			
			# Whether there are resources which are currently in use.
			def busy?
				@resources.collect do |_, usage|
					return true if usage > 0
				end
				
				return false
			end
			
			# Whether there are available resources, i.e. whether {#acquire} can reuse an existing resource.
			def available?
				@available.any?
			end
			
			# Wait until a pool resource has been freed.
			def wait
				@notification.wait
			end
			
			def empty?
				@resources.empty?
			end
			
			def acquire
				resource = wait_for_resource
				
				return resource unless block_given?
				
				begin
					yield resource
				ensure
					release(resource)
				end
			end
			
			# Make the resource resources and let waiting tasks know that there is something resources.
			def release(resource)
				processed = false
				
				# A resource that is not good should also not be reusable.
				if resource.reusable?
					processed = reuse(resource)
				end
			ensure
				retire(resource) unless processed
			end
			
			def close
				@available.clear
				
				while pair = @resources.shift
					resource, usage = pair
					
					if usage > 0
						Console.logger.warn(self, resource: resource, usage: usage) {"Closing resource while still in use!"}
					end
					
					resource.close
				end
				
				@gardener&.stop
			end
			
			def to_s
				if @resources.empty?
					"\#<#{self.class}(#{usage_string})>"
				else
					"\#<#{self.class}(#{usage_string}) #{availability_string}>"
				end
			end
			
			# Retire (and close) all unused resources. If a block is provided, it should implement the desired functionality for unused resources.
			# @param retain [Integer] the minimum number of resources to retain.
			# @yield resource [Resource] unused resources.
			def prune(retain = 0)
				unused = []
				
				# This code must not context switch:
				@resources.each do |resource, usage|
					if usage.zero?
						unused << resource
					end
				end
				
				# It's okay for this to context switch:
				unused.each do |resource|
					if block_given?
						yield resource
					else
						retire(resource)
					end
					
					break if @resources.size <= retain
				end
				
				# Update availability list:
				@available.clear
				@resources.each do |resource, usage|
					if usage < resource.concurrency and resource.reusable?
						@available << resource
					end
				end
				
				return unused.size
			end
			
			def retire(resource)
				Console.logger.debug(self) {"Retire #{resource}"}
				
				@resources.delete(resource)
				
				resource.close
				
				@notification.signal
				
				return true
			end
			
			protected
			
			def start_gardener
				return if @gardener
				
				Async(transient: true, annotation: "#{self.class} Gardener") do |task|
					@gardener = task
					
					Task.yield
				ensure
					@gardener = nil
					self.close
				end
			end
			
			def usage_string
				"#{@resources.size}/#{@limit || '∞'}"
			end
			
			def availability_string
				@resources.collect do |resource,usage|
					"#{usage}/#{resource.concurrency}#{resource.viable? ? nil : '*'}/#{resource.count}"
				end.join(";")
			end
			
			def usage
				@resources.count{|resource, usage| usage > 0}
			end
			
			def free
				@resources.count{|resource, usage| usage == 0}
			end
			
			def reuse(resource)
				Console.logger.debug(self) {"Reuse #{resource}"}
				usage = @resources[resource]
				
				if usage.zero?
					raise "Trying to reuse unacquired resource: #{resource}!"
				end
				
				# If the resource was fully utilized, it now becomes available:
				if usage == resource.concurrency
					@available.push(resource)
				end
				
				@resources[resource] = usage - 1
				
				@notification.signal
				
				return true
			end
			
			def wait_for_resource
				# If we fail to create a resource (below), we will end up waiting for one to become resources.
				until resource = available_resource
					@notification.wait
				end
				
				Console.logger.debug(self) {"Wait for resource -> #{resource}"}
				
				# if resource.concurrency > 1
				# 	@notification.signal
				# end
				
				return resource
			end
			
			# @returns [Object] A new resource in a "used" state.
			def create_resource
				self.start_gardener
				
				# This might return nil, which means creating the resource failed.
				if resource = @constructor.call
					@resources[resource] = 1
					
					# Make the resource available if it can be used multiple times:
					if resource.concurrency > 1
						@available.push(resource)
					end
				end
				
				return resource
			end
			
			# @returns [Object] An existing resource in a "used" state.
			def available_resource
				resource = nil
				
				@guard.acquire do
					resource = get_resource
				end
				
				return resource
			rescue Exception
				reuse(resource) if resource
				raise
			end
			
			private def get_resource
				while resource = @available.last
					if usage = @resources[resource] and usage < resource.concurrency
						if resource.viable?
							usage = (@resources[resource] += 1)
							
							if usage == resource.concurrency
								# The resource is used up to it's limit:
								@available.pop
							end
							
							return resource
						else
							retire(resource)
							@available.pop
						end
					else
						# The resource has been removed already, so skip it and remove it from the availability list.
						@available.pop
					end
				end
				
				if @limit.nil? or @resources.size < @limit
					Console.logger.debug(self) {"No available resources, allocating new one..."}
					
					return create_resource
				end
			end
		end
	end
end