1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Server
# Common methods used by both monitoring and non-monitoring connections.
#
# @note Although methods of this module are part of the public API,
# the fact that these methods are defined on this module and not on
# the classes which include this module is not part of the public API.
#
# @api semipublic
class ConnectionCommon
# The compressor negotiated during the handshake for this connection,
# if any.
#
# This attribute is nil for connections that haven't completed the
# handshake yet, and for connections that negotiated no compression.
#
# @return [ String | nil ] The compressor.
attr_reader :compressor
# Determine if the connection is currently connected.
#
# @example Is the connection connected?
# connection.connected?
#
# @return [ true, false ] If connected.
#
# @deprecated
def connected?
!!socket
end
# @return [ Integer ] pid The process id when the connection was created.
# @api private
attr_reader :pid
# Build a document that should be used for connection handshake.
#
# @param [ Server::AppMetadata ] app_metadata Application metadata
# @param [ BSON::Document ] speculative_auth_doc The speculative
# authentication document, if any.
# @param [ true | false ] load_balancer Whether the connection is to
# a load balancer.
# @param server_api [ Hash | nil ] server_api Server API version.
#
# @return [BSON::Document] Document that should be sent to a server
# for handshake purposes.
#
# @api private
def handshake_document(app_metadata, speculative_auth_doc: nil, load_balancer: false, server_api: nil)
serv_api = app_metadata.server_api || server_api
document = if serv_api
HELLO_DOC.merge(Utils.transform_server_api(serv_api))
else
LEGACY_HELLO_DOC
end
document.merge(app_metadata.validated_document).tap do |doc|
if speculative_auth_doc
doc.update(speculativeAuthenticate: speculative_auth_doc)
end
if load_balancer
doc.update(loadBalanced: true)
end
end
end
# Build a command that should be used for connection handshake.
#
# @param [ BSON::Document ] handshake_document Document that should be
# sent to a server for handshake purpose.
#
# @return [ Protocol::Message ] Command that should be sent to a server
# for handshake purposes.
#
# @api private
def handshake_command(handshake_document)
if handshake_document['apiVersion'] || handshake_document['loadBalanced']
Protocol::Msg.new(
[], {}, handshake_document.merge({'$db' => Database::ADMIN})
)
else
Protocol::Query.new(
Database::ADMIN,
Database::COMMAND,
handshake_document,
:limit => -1
)
end
end
private
HELLO_DOC = BSON::Document.new({ hello: 1 }).freeze
LEGACY_HELLO_DOC = BSON::Document.new({ isMaster: 1, helloOk: true }).freeze
attr_reader :socket
def set_compressor!(reply)
server_compressors = reply['compression']
if options[:compressors]
if intersection = (server_compressors & options[:compressors])
@compressor = intersection.first
else
msg = if server_compressors
"The server at #{address} has no compression algorithms in common with those requested. " +
"Server algorithms: #{server_compressors.join(', ')}; " +
"Requested algorithms: #{options[:compressors].join(', ')}. " +
"Compression will not be used"
else
"The server at #{address} did not advertise compression support. " +
"Requested algorithms: #{options[:compressors].join(', ')}. " +
"Compression will not be used"
end
log_warn(msg)
end
end
end
# Yields to the block and, if the block raises an exception, adds a note
# to the exception with the address of the specified server.
#
# This method is intended to add server address information to exceptions
# raised during execution of operations on servers.
def add_server_diagnostics
yield
# Note that the exception should already have been mapped to a
# Mongo::Error subclass when it gets to this method.
rescue Error::SocketError, Error::SocketTimeoutError => e
# Server::Monitor::Connection does not reference its server, but
# knows its address. Server::Connection delegates the address to its
# server.
note = +"on #{address.seed}"
if respond_to?(:id)
note << ", connection #{generation}:#{id}"
end
# Non-monitoring connections have service id.
# Monitoring connections do not.
if respond_to?(:service_id) && service_id
note << ", service id #{service_id}"
end
e.add_note(note)
if respond_to?(:generation)
# Non-monitoring connections
e.generation = generation
if respond_to?(:global_id)
e.connection_global_id = global_id
end
if respond_to?(:description)
e.service_id = service_id
end
end
raise e
end
def ssl_options
@ssl_options ||= if options[:ssl]
options.select { |k, v| k.to_s.start_with?('ssl') }
else
# Due to the way options are propagated from the client, if we
# decide that we don't want to use TLS we need to have the :ssl
# option explicitly set to false or the value provided to the
# connection might be overwritten by the default inherited from
# the client.
{ssl: false}
end.freeze
end
def ensure_connected
begin
unless socket
raise ArgumentError, "Connection #{generation}:#{id} for #{address.seed} is not connected"
end
if @error
raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is perished"
end
result = yield socket
success = true
result
ensure
unless success
@error = true
end
end
end
end
end
end
|