File: benchmark.rb

package info (click to toggle)
ruby-async 1.30.3-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 536 kB
  • sloc: ruby: 3,651; makefile: 4
file content (220 lines) | stat: -rwxr-xr-x 4,256 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'socket'
require 'fiber'

puts
puts RUBY_DESCRIPTION

if RUBY_VERSION < "2.0"
	class String
		def b
			self
		end
	end
end

# TODO: make these much larger, see if we're effectively batching
# even if we don't mean to...
QUERY_TEXT = "STATUS".freeze
RESPONSE_TEXT = "OK".freeze

NUM_WORKERS = (ARGV[0] || 10_000).to_i
NUM_REQUESTS = (ARGV[1] || 100).to_i

# Fiber reactor code taken from
# https://www.codeotaku.com/journal/2018-11/fibers-are-the-right-solution/index
class Reactor
		def initialize
				@readable = {}
				@writable = {}
		end

		def run
				while @readable.any? or @writable.any?
						readable, writable = IO.select(@readable.keys, @writable.keys, [])

						readable.each do |io|
								@readable[io].resume
						end

						writable.each do |io|
								@writable[io].resume
						end
				end
		end

		def wait_readable(io)
				@readable[io] = Fiber.current
				Fiber.yield
				@readable.delete(io)
		end

		def wait_writable(io)
				@writable[io] = Fiber.current
				Fiber.yield
				@writable.delete(io)
		end
end

class Wrapper
	def initialize(io, reactor)
		@io = io
		@reactor = reactor
	end
	
	if RUBY_VERSION >= "2.3"
		def read_nonblock(length, buffer)
			while true
				case result = @io.read_nonblock(length, buffer, exception: false)
				when :wait_readable
					@reactor.wait_readable(@io)
				when :wait_writable
					@reactor.wait_writable(@io)
				else
					return result
				end
			end

		end
		
		def write_nonblock(buffer)
			while true
				case result = @io.write_nonblock(buffer, exception: false)
				when :wait_readable
					@reactor.wait_readable(@io)
				when :wait_writable
					@reactor.wait_writable(@io)
				else
					return result
				end
			end
		end
	else
		def read_nonblock(length, buffer)
			while true
				begin
					return @io.read_nonblock(length, buffer)
				rescue IO::WaitReadable
					@reactor.wait_readable(@io)
				rescue IO::WaitWritable
					@reactor.wait_writable(@io)
				end
			end
		end
		
		def write_nonblock(buffer)
			while true
				begin
					return @io.write_nonblock(buffer)
				rescue IO::WaitReadable
					@reactor.wait_readable(@io)
				rescue IO::WaitWritable
					@reactor.wait_writable(@io)
				end
			end
		end
	end
	
	def read(length, buffer = nil)
		if buffer
			buffer.clear
		else
			buffer = String.new.b
		end
		
		result = self.read_nonblock(length - buffer.bytesize, buffer)
		
		if result == length
			return result
		end
		
		chunk = String.new.b
		while chunk = self.read_nonblock(length - buffer.bytesize, chunk)
			buffer << chunk
			
			break if buffer.bytesize == length
		end
		
		return buffer
	end
	
	def write(buffer)
		remaining = buffer.dup
		
		while true
			result = self.write_nonblock(remaining)
			
			if result == remaining.bytesize
				return buffer.bytesize
			else
				remaining = remaining.byteslice(result, remaining.bytesize - result)
			end
		end
	end
end

reactor = Reactor.new

worker_read = []
worker_write = []

master_read = []
master_write = []

workers = []

# puts "Setting up pipes..."
NUM_WORKERS.times do |i|
	r, w = IO.pipe
	worker_read.push Wrapper.new(r, reactor)
	master_write.push Wrapper.new(w, reactor)

	r, w = IO.pipe
	worker_write.push Wrapper.new(w, reactor)
	master_read.push Wrapper.new(r, reactor)
end

# puts "Setting up fibers..."
NUM_WORKERS.times do |i|
	f = Fiber.new do
		# Worker code
		NUM_REQUESTS.times do |req_num|
			q = worker_read[i].read(QUERY_TEXT.size)
			if q != QUERY_TEXT
				raise "Fail! Expected #{QUERY_TEXT.inspect} but got #{q.inspect} on request #{req_num.inspect}!"
			end
			worker_write[i].write(RESPONSE_TEXT)
		end
	end
	workers.push f
end

workers.each { |f| f.resume }

master_fiber = Fiber.new do
	NUM_WORKERS.times do |worker_num|
		f = Fiber.new do
			NUM_REQUESTS.times do |req_num|
				master_write[worker_num].write(QUERY_TEXT)
				buffer = master_read[worker_num].read(RESPONSE_TEXT.size)
				if buffer != RESPONSE_TEXT
					raise "Error! Fiber no. #{worker_num} on req #{req_num} expected #{RESPONSE_TEXT.inspect} but got #{buf.inspect}!"
				end
			end
		end
		f.resume
	end
end

master_fiber.resume

# puts "Starting reactor..."
reactor.run

# puts "Done, finished all reactor Fibers!"

puts Process.times

# Exit