1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
|
# frozen_string_literal: true
require "test_helper"
require "concurrent"
require "websocket-client-simple"
require "json"
require "active_support/hash_with_indifferent_access"
####
# 😷 Warning suppression 😷
WebSocket::Frame::Handler::Handler03.prepend Module.new {
def initialize(*)
@application_data_buffer = nil
super
end
}
WebSocket::Frame::Data.prepend Module.new {
def initialize(*)
@masking_key = nil
super
end
}
#
####
class ClientTest < ActionCable::TestCase
WAIT_WHEN_EXPECTING_EVENT = 2
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
class EchoChannel < ActionCable::Channel::Base
def subscribed
stream_from "global"
end
def unsubscribed
"Goodbye from EchoChannel!"
end
def ding(data)
transmit(dong: data["message"])
end
def delay(data)
sleep 1
transmit(dong: data["message"])
end
def bulk(data)
ActionCable.server.broadcast "global", wide: data["message"]
end
end
def setup
ActionCable.instance_variable_set(:@server, nil)
server = ActionCable.server
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async")
# and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true
end
def with_puma_server(rack_app = ActionCable.server, port = 3099)
server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
server.add_tcp_listener "127.0.0.1", port
server.min_threads = 1
server.max_threads = 4
thread = server.run
begin
yield port
ensure
server.stop
begin
thread.join
rescue IOError
# Work around https://bugs.ruby-lang.org/issues/13405
#
# Puma's sometimes raising while shutting down, when it closes
# its internal pipe. We can safely ignore that, but we do need
# to do the step skipped by the exception:
server.binder.close
rescue RuntimeError => ex
# Work around https://bugs.ruby-lang.org/issues/13239
raise unless ex.message =~ /can't modify frozen IOError/
# Handle this as if it were the IOError: do the same as above.
server.binder.close
end
end
end
class SyncClient
attr_reader :pings
def initialize(port)
messages = @messages = Queue.new
closed = @closed = Concurrent::Event.new
has_messages = @has_messages = Concurrent::Semaphore.new(0)
pings = @pings = Concurrent::AtomicFixnum.new(0)
open = Concurrent::Promise.new
@ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}/") do |ws|
ws.on(:error) do |event|
event = RuntimeError.new(event.message) unless event.is_a?(Exception)
if open.pending?
open.fail(event)
else
messages << event
has_messages.release
end
end
ws.on(:open) do |event|
open.set(true)
end
ws.on(:message) do |event|
if event.type == :close
closed.set
else
message = JSON.parse(event.data)
if message["type"] == "ping"
pings.increment
else
messages << message
has_messages.release
end
end
end
ws.on(:close) do |event|
closed.set
end
end
open.wait!(WAIT_WHEN_EXPECTING_EVENT)
end
def read_message
@has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
msg = @messages.pop(true)
raise msg if msg.is_a?(Exception)
msg
end
def read_messages(expected_size = 0)
list = []
loop do
if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
msg = @messages.pop(true)
raise msg if msg.is_a?(Exception)
list << msg
else
break
end
end
list
end
def send_message(message)
@ws.send(JSON.generate(message))
end
def close
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
unless @messages.empty?
raise "#{@messages.size} messages unprocessed"
end
@ws.close
wait_for_close
end
def wait_for_close
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
end
def closed?
@closed.set?
end
end
def websocket_client(port)
SyncClient.new(port)
end
def concurrently(enum)
enum.map { |*x| Concurrent::Future.execute { yield(*x) } }.map(&:value!)
end
def test_single_client
with_puma_server do |port|
c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "message" => { "dong" => "hello" } }, c.read_message)
c.close
end
end
def test_interacting_clients
with_puma_server do |port|
clients = concurrently(10.times) { websocket_client(port) }
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
concurrently(clients) do |c|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello")
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
assert_equal clients.size, c.read_messages(clients.size).size
end
concurrently(clients, &:close)
end
end
def test_many_clients
with_puma_server do |port|
clients = concurrently(100.times) { websocket_client(port) }
concurrently(clients) do |c|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
end
concurrently(clients, &:close)
end
end
def test_disappearing_client
with_puma_server do |port|
c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello")
c.close # disappear before write
c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
c.close # disappear before read
end
end
def test_unsubscribe_client
with_puma_server do |port|
app = ActionCable.server
identifier = JSON.generate(channel: "ClientTest::EchoChannel")
c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message)
c.send_message command: "subscribe", identifier: identifier
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
assert_equal(1, app.connections.count)
assert(app.remote_connections.where(identifier: identifier))
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription"
channel = subscriptions.first[1]
channel.expects(:unsubscribed)
c.close
sleep 0.1 # Data takes a moment to process
# All data is removed: No more connection or subscription information!
assert_equal(0, app.connections.count)
end
end
def test_server_restart
with_puma_server do |port|
c = websocket_client(port)
assert_equal({ "type" => "welcome" }, c.read_message)
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
ActionCable.server.restart
c.wait_for_close
assert_predicate c, :closed?
end
end
end
|