File: controller.rb

package info (click to toggle)
ruby-async-pool 0.3.12-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 188 kB
  • sloc: ruby: 535; makefile: 4
file content (230 lines) | stat: -rw-r--r-- 4,462 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2022, by Samuel Williams.

require 'nonblocking_resource'
require 'sus/fixtures/async/reactor_context'

describe Async::Pool::Controller do
	include Sus::Fixtures::Async::ReactorContext
	
	let(:pool) {subject.new(Async::Pool::Resource)}
	
	with '#acquire' do
		it "can allocate resources" do
			object = pool.acquire
			
			expect(object).not.to be_nil
			expect(pool).to be(:busy?)
			
			pool.release(object)
			expect(pool).not.to be(:busy?)
		end
	end
	
	with '#release' do
		it "will reuse resources" do
			object = pool.acquire
			
			expect(object).to receive(:reusable?).and_return(true)
			
			pool.release(object)
			
			expect(pool).to be(:active?)
		end
		
		it "will retire unusable resources" do
			object = pool.acquire
			
			expect(object).to receive(:reusable?).and_return(false)
			
			pool.release(object)
			
			expect(pool).not.to be(:active?)
		end
		
		it "will fail when releasing an unacquired resource" do
			object = pool.acquire
			
			mock(object) do |mock|
				mock.replace(:reusable?) {true}
			end
			
			pool.release(object)
			
			expect do
				pool.release(object)
			end.to raise_exception(RuntimeError, message: /unacquired resource/)
		end
	end
	
	with '#prune' do
		it "can prune unused resources" do
			pool.acquire{}
			
			expect(pool).to be(:active?)
			
			pool.prune
			
			expect(pool).not.to be(:active?)
		end
	end
	
	with '#close' do
		it "will no longer be active" do
			object = pool.acquire
			expect(object).to receive(:reusable?).and_return(true)
			pool.release(object)
			
			pool.close
			
			expect(pool).not.to be(:active?)
		end
		
		it "should clear list of available resources" do
			object = pool.acquire
			expect(object).to receive(:reusable?).and_return(true)
			pool.release(object)
			
			expect(pool.available).not.to be(:empty?)
			
			pool.close
			
			expect(pool.available).to be(:empty?)
		end
		
		it "can acquire resource during close" do
			object = pool.acquire
			
			mock(object) do |mock|
				mock.replace(:close) do
					pool.acquire{}
				end
			end
				
			pool.release(object)
			
			pool.close
			
			expect(pool).not.to be(:active?)
		end
	end
	
	with '#to_s' do
		it "can inspect empty pool" do
			expect(pool.to_s).to be(:match?, "0/∞")
		end
	end
	
	with 'a small limit' do
		let(:pool) {subject.new(Async::Pool::Resource, limit: 1)}
		
		with '#to_s' do
			it "can inspect empty pool" do
				expect(pool.to_s).to be(:match?, "0/1")
			end
		end
		
		with '#acquire' do
			it "will limit allocations" do
				state = nil
				inner = nil
				outer = pool.acquire
				
				reactor.async do
					state = :waiting
					inner = pool.acquire
					state = :acquired
					pool.release(inner)
				end
				
				expect(state).to be == :waiting
				pool.release(outer)
				reactor.yield
				expect(state).to be == :acquired
				
				expect(outer).to be == inner
			end
		end
	end
	
	with "with non-blocking connect" do
		let(:pool) do
			subject.wrap do
				# Simulate a non-blocking connection:
				Async::Task.current.sleep(0.1)
				
				Async::Pool::Resource.new
			end
		end
		
		with '#acquire' do
			it "can reuse resources" do
				3.times do
					pool.acquire{}
				end
				
				expect(pool.size).to be == 1
			end
		end
	end
	
	with 'a busy connection pool' do
		let(:pool) {subject.new(NonblockingResource)}
		let(:timeout) {60}
		
		def failures(repeats: 500, time_scale: 0.001, &block)
			count = 0
			backtraces = Set.new
			
			Sync do |task|
				while count < repeats
					begin
						task.with_timeout(rand * time_scale, &block)
					rescue Async::TimeoutError => error
						backtraces << error.backtrace.first(10)
						count += 1
					else
						if count.zero?
							time_scale /= 2
						end
					end
				end
			end
			
			# pp backtraces
		end
		
		it "robustly releases resources" do
			failures do
				begin
					resource = pool.acquire
				ensure
					pool.release(resource) if resource
				end
			end
			
			expect(pool).not.to be(:busy?)
		end
	end
end

describe Async::Pool::Controller do
	let(:pool) {subject.new(Async::Pool::Resource)}
	
	with '#close' do
		it "closes all resources when going out of scope" do
			Async do
				object = pool.acquire
				expect(object).not.to be_nil
				pool.release(object)
				
				# There is some resource which is still open:
				expect(pool.resources).not.to be(:empty?)
			end
			
			expect(pool.resources).to be(:empty?)
		end
	end
end