File: selector.rb

package info (click to toggle)
ruby-io-event 1.14.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 400 kB
  • sloc: ansic: 3,709; ruby: 736; makefile: 4
file content (171 lines) | stat: -rw-r--r-- 5,050 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2021-2025, by Samuel Williams.

require_relative "../support"

module IO::Event
	# @namespace
	module Debug
		# Enforces the selector interface and delegates operations to a wrapped selector instance.
		#
		# You can enable this in the default selector by setting the `IO_EVENT_DEBUG_SELECTOR` environment variable. In addition, you can log all selector operations to a file by setting the `IO_EVENT_DEBUG_SELECTOR_LOG` environment variable. This is useful for debugging and understanding the behavior of the event loop.
		class Selector
			# Wrap the given selector with debugging.
			#
			# @parameter selector [Selector] The selector to wrap.
			# @parameter env [Hash] The environment to read configuration from.
			def self.wrap(selector, env = ENV)
				log = nil
				
				if log_path = env["IO_EVENT_DEBUG_SELECTOR_LOG"]
					log = File.open(log_path, "w")
				end
				
				return self.new(selector, log: log)
			end
			
			# Initialize the debug selector with the given selector and optional log.
			#
			# @parameter selector [Selector] The selector to wrap.
			# @parameter log [IO] The log to write debug messages to.
			def initialize(selector, log: nil)
				@selector = selector
				
				@readable = {}
				@writable = {}
				@priority = {}
				
				unless Fiber.current == selector.loop
					Kernel::raise "Selector must be initialized on event loop fiber!"
				end
				
				@log = log
			end
			
			# The idle duration of the underlying selector.
			#
			# @returns [Numeric] The idle duration.
			def idle_duration
				@selector.idle_duration
			end
			
			# The current time.
			#
			# @returns [Numeric] The current time.
			def now
				Process.clock_gettime(Process::CLOCK_MONOTONIC)
			end
			
			# Log the given message.
			#
			# @asynchronous Will block the calling fiber and the entire event loop.
			def log(message)
				return unless @log
				
				Fiber.blocking do
					@log.puts("T+%10.1f; %s" % [now, message])
				end
			end
			
			# Wakeup the the selector.
			def wakeup
				@selector.wakeup
			end
			
			# Close the selector.
			def close
				log("Closing selector")
				
				if @selector.nil?
					Kernel::raise "Selector already closed!"
				end
				
				@selector.close
				@selector = nil
			end
			
			# Transfer from the calling fiber to the selector.
			def transfer
				log("Transfering to event loop")
				@selector.transfer
			end
			
			# Resume the given fiber with the given arguments.
			def resume(*arguments)
				log("Resuming fiber with #{arguments.inspect}")
				@selector.resume(*arguments)
			end
			
			# Yield to the selector.
			def yield
				log("Yielding to event loop")
				@selector.yield
			end
			
			# Push the given fiber to the selector ready list, such that it will be resumed on the next call to {select}.
			#
			# @parameter fiber [Fiber] The fiber that is ready.
			def push(fiber)
				log("Pushing fiber #{fiber.inspect} to ready list")
				@selector.push(fiber)
			end
			
			# Raise the given exception on the given fiber.
			#
			# @parameter fiber [Fiber] The fiber to raise the exception on.
			# @parameter arguments [Array] The arguments to use when raising the exception.
			def raise(fiber, *arguments, **options)
				log("Raising exception on fiber #{fiber.inspect} with #{arguments.inspect}")
				@selector.raise(fiber, *arguments, **options)
			end
			
			# Check if the selector is ready.
			#
			# @returns [Boolean] Whether the selector is ready.
			def ready?
				@selector.ready?
			end
			
			# Wait for the given process, forwarded to the underlying selector.
			def process_wait(*arguments)
				log("Waiting for process with #{arguments.inspect}")
				@selector.process_wait(*arguments)
			end
			
			# Wait for the given IO, forwarded to the underlying selector.
			def io_wait(fiber, io, events)
				log("Waiting for IO #{io.inspect} for events #{events.inspect}")
				@selector.io_wait(fiber, io, events)
			end
			
			# Read from the given IO, forwarded to the underlying selector.
			def io_read(fiber, io, buffer, length, offset = 0)
				log("Reading from IO #{io.inspect} with buffer #{buffer}; length #{length} offset #{offset}")
				@selector.io_read(fiber, io, buffer, length, offset)
			end
			
			# Write to the given IO, forwarded to the underlying selector.
			def io_write(fiber, io, buffer, length, offset = 0)
				log("Writing to IO #{io.inspect} with buffer #{buffer}; length #{length} offset #{offset}")
				@selector.io_write(fiber, io, buffer, length, offset)
			end
			
			# Forward the given method to the underlying selector.
			def respond_to?(name, include_private = false)
				@selector.respond_to?(name, include_private)
			end
			
			# Select for the given duration, forwarded to the underlying selector.
			def select(duration = nil)
				log("Selecting for #{duration.inspect}")
				unless Fiber.current == @selector.loop
					Kernel::raise "Selector must be run on event loop fiber!"
				end
				
				@selector.select(duration)
			end
		end
	end
end