1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
|
#!/usr/bin/env ruby
# frozen_string_literal: true
require 'socket'
require 'fiber'
puts
puts RUBY_DESCRIPTION
if RUBY_VERSION < "2.0"
class String
def b
self
end
end
end
# TODO: make these much larger, see if we're effectively batching
# even if we don't mean to...
QUERY_TEXT = "STATUS".freeze
RESPONSE_TEXT = "OK".freeze
NUM_WORKERS = (ARGV[0] || 10_000).to_i
NUM_REQUESTS = (ARGV[1] || 100).to_i
# Fiber reactor code taken from
# https://www.codeotaku.com/journal/2018-11/fibers-are-the-right-solution/index
class Reactor
def initialize
@readable = {}
@writable = {}
end
def run
while @readable.any? or @writable.any?
readable, writable = IO.select(@readable.keys, @writable.keys, [])
readable.each do |io|
@readable[io].resume
end
writable.each do |io|
@writable[io].resume
end
end
end
def wait_readable(io)
@readable[io] = Fiber.current
Fiber.yield
@readable.delete(io)
end
def wait_writable(io)
@writable[io] = Fiber.current
Fiber.yield
@writable.delete(io)
end
end
class Wrapper
def initialize(io, reactor)
@io = io
@reactor = reactor
end
if RUBY_VERSION >= "2.3"
def read_nonblock(length, buffer)
while true
case result = @io.read_nonblock(length, buffer, exception: false)
when :wait_readable
@reactor.wait_readable(@io)
when :wait_writable
@reactor.wait_writable(@io)
else
return result
end
end
end
def write_nonblock(buffer)
while true
case result = @io.write_nonblock(buffer, exception: false)
when :wait_readable
@reactor.wait_readable(@io)
when :wait_writable
@reactor.wait_writable(@io)
else
return result
end
end
end
else
def read_nonblock(length, buffer)
while true
begin
return @io.read_nonblock(length, buffer)
rescue IO::WaitReadable
@reactor.wait_readable(@io)
rescue IO::WaitWritable
@reactor.wait_writable(@io)
end
end
end
def write_nonblock(buffer)
while true
begin
return @io.write_nonblock(buffer)
rescue IO::WaitReadable
@reactor.wait_readable(@io)
rescue IO::WaitWritable
@reactor.wait_writable(@io)
end
end
end
end
def read(length, buffer = nil)
if buffer
buffer.clear
else
buffer = String.new.b
end
result = self.read_nonblock(length - buffer.bytesize, buffer)
if result == length
return result
end
chunk = String.new.b
while chunk = self.read_nonblock(length - buffer.bytesize, chunk)
buffer << chunk
break if buffer.bytesize == length
end
return buffer
end
def write(buffer)
remaining = buffer.dup
while true
result = self.write_nonblock(remaining)
if result == remaining.bytesize
return buffer.bytesize
else
remaining = remaining.byteslice(result, remaining.bytesize - result)
end
end
end
end
reactor = Reactor.new
worker_read = []
worker_write = []
master_read = []
master_write = []
workers = []
# puts "Setting up pipes..."
NUM_WORKERS.times do |i|
r, w = IO.pipe
worker_read.push Wrapper.new(r, reactor)
master_write.push Wrapper.new(w, reactor)
r, w = IO.pipe
worker_write.push Wrapper.new(w, reactor)
master_read.push Wrapper.new(r, reactor)
end
# puts "Setting up fibers..."
NUM_WORKERS.times do |i|
f = Fiber.new do
# Worker code
NUM_REQUESTS.times do |req_num|
q = worker_read[i].read(QUERY_TEXT.size)
if q != QUERY_TEXT
raise "Fail! Expected #{QUERY_TEXT.inspect} but got #{q.inspect} on request #{req_num.inspect}!"
end
worker_write[i].write(RESPONSE_TEXT)
end
end
workers.push f
end
workers.each { |f| f.resume }
master_fiber = Fiber.new do
NUM_WORKERS.times do |worker_num|
f = Fiber.new do
NUM_REQUESTS.times do |req_num|
master_write[worker_num].write(QUERY_TEXT)
buffer = master_read[worker_num].read(RESPONSE_TEXT.size)
if buffer != RESPONSE_TEXT
raise "Error! Fiber no. #{worker_num} on req #{req_num} expected #{RESPONSE_TEXT.inspect} but got #{buf.inspect}!"
end
end
end
f.resume
end
end
master_fiber.resume
# puts "Starting reactor..."
reactor.run
# puts "Done, finished all reactor Fibers!"
puts Process.times
# Exit
|