File: select.rb

package info (click to toggle)
ruby-io-event 1.14.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 400 kB
  • sloc: ansic: 3,709; ruby: 736; makefile: 4
file content (426 lines) | stat: -rw-r--r-- 10,719 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2021-2025, by Samuel Williams.
# Copyright, 2023, by Math Ieu.

require_relative "../interrupt"
require_relative "../support"

module IO::Event
	module Selector
		# A pure-Ruby implementation of the event selector.
		class Select
			# Initialize the selector with the given event loop fiber.
			def initialize(loop)
				@loop = loop
				
				@waiting = Hash.new.compare_by_identity
				
				# Flag indicating whether the selector is currently blocked in a system call.
				# Set to true when blocked in ::IO.select, false otherwise.
				# Used by wakeup() to determine if an interrupt signal is needed.
				@blocked = false
				
				@ready = Queue.new
				@interrupt = Interrupt.attach(self)
				
				@idle_duration = 0.0
			end
			
			# @attribute [Fiber] The event loop fiber.
			attr :loop
			
			# @attribute [Float] This is the amount of time the event loop was idle during the last select call.
			attr :idle_duration
			
			# Wake up the event loop if it is currently sleeping.
			def wakeup
				if @blocked
					@interrupt.signal
					
					return true
				end
				
				return false
			end
			
			# Close the selector and release any resources.
			def close
				@interrupt.close
				
				@loop = nil
				@waiting = nil
			end
			
			Optional = Struct.new(:fiber) do
				def transfer(*arguments)
					fiber&.transfer(*arguments)
				end
				
				def alive?
					fiber&.alive?
				end
				
				def nullify
					self.fiber = nil
				end
			end
			
			# Transfer from the current fiber to the event loop.
			def transfer
				@loop.transfer
			end
			
			# Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
			def resume(fiber, *arguments)
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				fiber.transfer(*arguments)
			ensure
				optional.nullify
			end
			
			# Yield from the current fiber back to the event loop. Put the current fiber into the ready list.
			def yield
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				@loop.transfer
			ensure
				optional.nullify
			end
			
			# Append the given fiber into the ready list.
			def push(fiber)
				@ready.push(fiber)
			end
			
			# Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
			def raise(fiber, *arguments, **options)
				optional = Optional.new(Fiber.current)
				@ready.push(optional)
				
				fiber.raise(*arguments, **options)
			ensure
				optional.nullify
			end
			
			# @returns [Boolean] Whether the ready list is not empty, i.e. there are fibers ready to be resumed.
			def ready?
				!@ready.empty?
			end
			
			Waiter = Struct.new(:fiber, :events, :tail) do
				def alive?
					self.fiber&.alive?
				end
				
				# Dispatch the given events to the list of waiting fibers. If the fiber was not waiting for the given events, it is reactivated by calling the given block.
				def dispatch(events, &reactivate)
					# We capture the tail here, because calling reactivate might modify it:
					tail = self.tail
					
					if fiber = self.fiber
						if fiber.alive?
							revents = events & self.events
							if revents.zero?
								reactivate.call(self)
							else
								self.fiber = nil
								fiber.transfer(revents)
							end
						else
							self.fiber = nil
						end
					end
					
					tail&.dispatch(events, &reactivate)
				end
				
				def invalidate
					self.fiber = nil
				end
				
				def each(&block)
					if fiber = self.fiber
						yield fiber, self.events
					end
					
					self.tail&.each(&block)
				end
			end
			
			# Wait for the given IO to become readable or writable.
			#
			# @parameter fiber [Fiber] The fiber that is waiting.
			# @parameter io [IO] The IO object to wait on.
			# @parameter events [Integer] The events to wait for.
			def io_wait(fiber, io, events)
				waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io])
				
				@loop.transfer
			ensure
				waiter&.invalidate
			end
			
			# Wait for multiple IO objects to become readable or writable.
			#
			# @parameter readable [Array(IO)] The list of IO objects to wait for readability.
			# @parameter writable [Array(IO)] The list of IO objects to wait for writability.
			# @parameter priority [Array(IO)] The list of IO objects to wait for priority events.
			def io_select(readable, writable, priority, timeout)
				Thread.new do
					IO.select(readable, writable, priority, timeout)
				end.value
			end
			
			EAGAIN = -Errno::EAGAIN::Errno
			EWOULDBLOCK = -Errno::EWOULDBLOCK::Errno
			
			# Whether the given error code indicates that the operation should be retried.
			protected def again?(errno)
				errno == EAGAIN or errno == EWOULDBLOCK
			end
			
			if Support.fiber_scheduler_v3?
				# Ruby 3.3+, full IO::Buffer support.
				
				# Read from the given IO to the buffer.
				#
				# @parameter length [Integer] The minimum number of bytes to read.
				# @parameter offset [Integer] The offset into the buffer to read to.
				def io_read(fiber, io, buffer, length, offset = 0)
					total = 0
					
					Selector.nonblock(io) do
						while true
							result = Fiber.blocking{buffer.read(io, 0, offset)}
							
							if result < 0
								if length > 0 and again?(result)
									self.io_wait(fiber, io, IO::READABLE)
								else
									return result
								end
							elsif result == 0
								break
							else
								total += result
								break if total >= length
								offset += result
							end
						end
					end
					
					return total
				end
				
				# Write to the given IO from the buffer.
				#
				# @parameter length [Integer] The minimum number of bytes to write.
				# @parameter offset [Integer] The offset into the buffer to write from.
				def io_write(fiber, io, buffer, length, offset = 0)
					total = 0
					
					Selector.nonblock(io) do
						while true
							result = Fiber.blocking{buffer.write(io, 0, offset)}
							
							if result < 0
								if length > 0 and again?(result)
									self.io_wait(fiber, io, IO::READABLE)
								else
									return result
								end
							elsif result == 0
								break result
							else
								total += result
								break if total >= length
								offset += result
							end
						end
					end
					
					return total
				end
			elsif Support.fiber_scheduler_v2?
				# Ruby 3.2, most IO::Buffer support, but slightly clunky read/write methods.
				def io_read(fiber, io, buffer, length, offset = 0)
					total = 0
					
					Selector.nonblock(io) do
						maximum_size = buffer.size - offset
						while maximum_size > 0
							result = Fiber.blocking{buffer.read(io, maximum_size, offset)}
							
							if again?(result)
								if length > 0
									self.io_wait(fiber, io, IO::READABLE)
								else
									return result
								end
							elsif result < 0
								return result
							else
								total += result
								offset += result
								break if total >= length
							end
							
							maximum_size = buffer.size - offset
						end
					end
					
					return total
				end
				
				def io_write(fiber, io, buffer, length, offset = 0)
					total = 0
					
					Selector.nonblock(io) do
						maximum_size = buffer.size - offset
						while maximum_size > 0
							result = Fiber.blocking{buffer.write(io, maximum_size, offset)}
							
							if again?(result)
								if length > 0
									self.io_wait(fiber, io, IO::READABLE)
								else
									return result
								end
							elsif result < 0
								return result
							else
								total += result
								offset += result
								break if total >= length
							end
							
							maximum_size = buffer.size - offset
						end
					end
					
					return total
				end
			end
			
			# Wait for a process to change state.
			#
			# @parameter fiber [Fiber] The fiber to resume after waiting.
			# @parameter pid [Integer] The process ID to wait for.
			# @parameter flags [Integer] Flags to pass to Process::Status.wait.
			# @returns [Process::Status] The status of the waited process.
			def process_wait(fiber, pid, flags)
				Thread.new do
					Process::Status.wait(pid, flags)
				end.value
			end
			
			private def pop_ready
				unless @ready.empty?
					count = @ready.size
					
					count.times do
						fiber = @ready.pop
						fiber.transfer if fiber.alive?
					end
					
					return true
				end
			end
			
			# Wait for IO events or a timeout.
			#
			# @parameter duration [Numeric | Nil] The maximum time to wait, or nil for no timeout.
			# @returns [Integer] The number of ready IO objects.
			def select(duration = nil)
				if pop_ready
					# If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop:
					duration = 0
				end
				
				readable = Array.new
				writable = Array.new
				priority = Array.new
				
				@waiting.delete_if do |io, waiter|
					if io.closed?
						true
					else
						waiter.each do |fiber, events|
							if (events & IO::READABLE) > 0
								readable << io
							end
							
							if (events & IO::WRITABLE) > 0
								writable << io
							end
							
							if (events & IO::PRIORITY) > 0
								priority << io
							end
						end
						
						false
					end
				end
				
				duration = 0 unless @ready.empty?
				error = nil
				
				if duration&.>(0)
					start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
				else
					@idle_duration = 0.0
				end
				
				# We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR.
				Thread.handle_interrupt(::Exception => :on_blocking) do
					@blocked = true
					readable, writable, priority = ::IO.select(readable, writable, priority, duration)
				rescue ::Exception => error
					# Requeue below...
				ensure
					@blocked = false
					if start_time
						end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
						@idle_duration = end_time - start_time
					end
				end
				
				if error
					# Requeue the error into the pending exception queue:
					Thread.current.raise(error)
					return 0
				end
				
				ready = Hash.new(0).compare_by_identity
				
				readable&.each do |io|
					ready[io] |= IO::READABLE
				end
				
				writable&.each do |io|
					ready[io] |= IO::WRITABLE
				end
				
				priority&.each do |io|
					ready[io] |= IO::PRIORITY
				end
				
				ready.each do |io, events|
					@waiting.delete(io).dispatch(events) do |waiter|
						# Re-schedule the waiting IO:
						waiter.tail = @waiting[io]
						@waiting[io] = waiter
					end
				end
				
				return ready.size
			end
		end
	end
end