1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
# frozen_string_literal: true
# :markup: markdown
require "nio"
module ActionCable
module Connection
class StreamEventLoop
def initialize
@nio = @executor = @thread = nil
@map = {}
@stopping = false
@todo = Queue.new
@spawn_mutex = Mutex.new
end
def timer(interval, &block)
Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end
def post(task = nil, &block)
task ||= block
spawn
@executor << task
end
def attach(io, stream)
@todo << lambda do
@map[io] = @nio.register(io, :r)
@map[io].value = stream
end
wakeup
end
def detach(io, stream)
@todo << lambda do
@nio.deregister io
@map.delete io
io.close
end
wakeup
end
def writes_pending(io)
@todo << lambda do
if monitor = @map[io]
monitor.interests = :rw
end
end
wakeup
end
def stop
@stopping = true
wakeup if @nio
end
private
def spawn
return if @thread && @thread.status
@spawn_mutex.synchronize do
return if @thread && @thread.status
@nio ||= NIO::Selector.new
@executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: 10,
max_queue: 0,
)
@thread = Thread.new { run }
return true
end
end
def wakeup
spawn || @nio.wakeup
end
def run
loop do
if @stopping
@nio.close
break
end
until @todo.empty?
@todo.pop(true).call
end
next unless monitors = @nio.select
monitors.each do |monitor|
io = monitor.io
stream = monitor.value
begin
if monitor.writable?
if stream.flush_write_buffer
monitor.interests = :r
end
next unless monitor.readable?
end
incoming = io.read_nonblock(4096, exception: false)
case incoming
when :wait_readable
next
when nil
stream.close
else
stream.receive incoming
end
rescue
# We expect one of EOFError or Errno::ECONNRESET in normal operation (when the
# client goes away). But if anything else goes wrong, this is still the best way
# to handle it.
begin
stream.close
rescue
@nio.deregister io
@map.delete io
end
end
end
end
end
end
end
end
|