1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
# frozen_string_literal: true
# :markup: markdown
require "action_dispatch"
require "active_support/rescuable"
module ActionCable
module Connection
# # Action Cable Connection Base
#
# For every WebSocket connection the Action Cable server accepts, a Connection
# object will be instantiated. This instance becomes the parent of all of the
# channel subscriptions that are created from there on. Incoming messages are
# then routed to these channel subscriptions based on an identifier sent by the
# Action Cable consumer. The Connection itself does not deal with any specific
# application logic beyond authentication and authorization.
#
# Here's a basic example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
#
# def connect
# self.current_user = find_verified_user
# logger.add_tags current_user.name
# end
#
# def disconnect
# # Any cleanup work needed when the cable connection is cut.
# end
#
# private
# def find_verified_user
# User.find_by_identity(cookies.encrypted[:identity_id]) ||
# reject_unauthorized_connection
# end
# end
# end
#
# First, we declare that this connection can be identified by its current_user.
# This allows us to later be able to find all connections established for that
# current_user (and potentially disconnect them). You can declare as many
# identification indexes as you like. Declaring an identification means that an
# attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with
# the cookies from the domain being sent along. This makes it easy to use signed
# cookies that were set when logging in via a web interface to authorize the
# WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with the name of the
# current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
include Identification
include InternalChannel
include Authorization
include Callbacks
include ActiveSupport::Rescuable
attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
delegate :event_loop, :pubsub, :config, to: :server
def initialize(server, env, coder: ActiveSupport::JSON)
@server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_subscriptions = nil
@started_at = Time.now
end
# Called by the server when a new WebSocket connection is established. This
# configures the callbacks intended for overwriting by the user. This method
# should not be called directly -- instead rely upon on the #connect (and
# #disconnect) callbacks.
def process # :nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
respond_to_successful_request
else
respond_to_invalid_request
end
end
# Decodes WebSocket messages and dispatches them to subscribed channels.
# WebSocket message transfer encoding is always JSON.
def receive(websocket_message) # :nodoc:
send_async :dispatch_websocket_message, websocket_message
end
def dispatch_websocket_message(websocket_message) # :nodoc:
if websocket.alive?
handle_channel_command decode(websocket_message)
else
logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
def handle_channel_command(payload)
run_callbacks :command do
subscriptions.execute_command payload
end
end
def transmit(cable_message) # :nodoc:
websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
def close(reason: nil, reconnect: true)
transmit(
type: ActionCable::INTERNAL[:message_types][:disconnect],
reason: reason,
reconnect: reconnect
)
websocket.close
end
# Invoke a method on the connection asynchronously through the pool of thread
# workers.
def send_async(method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`,
# `started_at`, `subscriptions`, and `request_id`. This can be returned by a
# health check against the connection.
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
request_id: @env["action_dispatch.request_id"]
}
end
def beat
transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
def on_open # :nodoc:
send_async :handle_open
end
def on_message(message) # :nodoc:
message_buffer.append message
end
def on_error(message) # :nodoc:
# log errors to make diagnosing socket errors easier
logger.error "WebSocket error occurred: #{message}"
end
def on_close(reason, code) # :nodoc:
send_async :handle_close
end
def inspect # :nodoc:
"#<#{self.class.name}:#{'%#016x' % (object_id << 1)}>"
end
private
attr_reader :websocket
attr_reader :message_buffer
# The request that initiated the WebSocket connection is available here. This
# gives access to the environment, cookies, etc.
def request # :doc:
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
ActionDispatch::Request.new(environment || env)
end
end
# The cookies of the request that initiated the WebSocket connection. Useful for
# performing authorization checks.
def cookies # :doc:
request.cookie_jar
end
def encode(cable_message)
@coder.encode cable_message
end
def decode(websocket_message)
@coder.decode websocket_message
end
def handle_open
@protocol = websocket.protocol
connect if respond_to?(:connect)
subscribe_to_internal_channel
send_welcome_message
message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
end
def handle_close
logger.info finished_request_message
server.remove_connection(self)
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
disconnect if respond_to?(:disconnect)
end
def send_welcome_message
# Send welcome message to the internal connection monitor channel. This ensures
# the connection monitor state is reset after a successful websocket connection.
transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
proto = Rack::Request.new(env).ssl? ? "https" : "http"
if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
true
elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] }
true
else
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
false
end
end
def respond_to_successful_request
logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?
logger.error invalid_request_message
logger.info finished_request_message
[ 404, { Rack::CONTENT_TYPE => "text/plain; charset=utf-8" }, [ "Page not found" ] ]
end
# Tags are declared in the server but computed in the connection. This allows us
# per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,
tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
def invalid_request_message
"Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
def successful_request_message
"Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
end
end
end
ActiveSupport.run_load_hooks(:action_cable_connection, ActionCable::Connection::Base)
|