File: connection_common.rb

package info (click to toggle)
ruby-mongo 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,020 kB
  • sloc: ruby: 110,810; makefile: 5
file content (208 lines) | stat: -rw-r--r-- 7,305 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # Common methods used by both monitoring and non-monitoring connections.
    #
    # @note Although methods of this module are part of the public API,
    #   the fact that these methods are defined on this module and not on
    #   the classes which include this module is not part of the public API.
    #
    # @api semipublic
    class ConnectionCommon

      # The compressor negotiated during the handshake for this connection,
      # if any.
      #
      # This attribute is nil for connections that haven't completed the
      # handshake yet, and for connections that negotiated no compression.
      #
      # @return [ String | nil ] The compressor.
      attr_reader :compressor

      # Determine if the connection is currently connected.
      #
      # @example Is the connection connected?
      #   connection.connected?
      #
      # @return [ true, false ] If connected.
      #
      # @deprecated
      def connected?
        !!socket
      end

      # @return [ Integer ] pid The process id when the connection was created.
      # @api private
      attr_reader :pid

      # Build a document that should be used for connection handshake.
      #
      # @param [ Server::AppMetadata ] app_metadata Application metadata
      # @param [ BSON::Document ] speculative_auth_doc The speculative
      #   authentication document, if any.
      # @param [ true | false ] load_balancer Whether the connection is to
      #   a load balancer.
      # @param server_api [ Hash | nil ] server_api Server API version.
      #
      # @return [BSON::Document] Document that should be sent to a server
      #     for handshake purposes.
      #
      # @api private
      def handshake_document(app_metadata, speculative_auth_doc: nil, load_balancer: false, server_api: nil)
        serv_api = app_metadata.server_api || server_api
        document = if serv_api
                     HELLO_DOC.merge(Utils.transform_server_api(serv_api))
                   else
                     LEGACY_HELLO_DOC
                   end
        document.merge(app_metadata.validated_document).tap do |doc|
          if speculative_auth_doc
            doc.update(speculativeAuthenticate: speculative_auth_doc)
          end
          if load_balancer
            doc.update(loadBalanced: true)
          end
        end
      end

      # Build a command that should be used for connection handshake.
      #
      # @param [ BSON::Document ] handshake_document Document that should be
      #   sent to a server for handshake purpose.
      #
      # @return [ Protocol::Message ] Command that should be sent to a server
      #   for handshake purposes.
      #
      # @api private
      def handshake_command(handshake_document)
        if handshake_document['apiVersion'] || handshake_document['loadBalanced']
          Protocol::Msg.new(
            [], {}, handshake_document.merge({'$db' => Database::ADMIN})
          )
        else
          Protocol::Query.new(
            Database::ADMIN,
            Database::COMMAND,
            handshake_document,
            :limit => -1
          )
        end
      end


      private

      HELLO_DOC = BSON::Document.new({ hello: 1 }).freeze

      LEGACY_HELLO_DOC = BSON::Document.new({ isMaster: 1, helloOk: true }).freeze

      attr_reader :socket

      def set_compressor!(reply)
        server_compressors = reply['compression']

        if options[:compressors]
          if intersection = (server_compressors & options[:compressors])
            @compressor = intersection.first
          else
            msg = if server_compressors
              "The server at #{address} has no compression algorithms in common with those requested. " +
                "Server algorithms: #{server_compressors.join(', ')}; " +
                "Requested algorithms: #{options[:compressors].join(', ')}. " +
                "Compression will not be used"
            else
              "The server at #{address} did not advertise compression support. " +
                "Requested algorithms: #{options[:compressors].join(', ')}. " +
                "Compression will not be used"
            end
            log_warn(msg)
          end
        end
      end

      # Yields to the block and, if the block raises an exception, adds a note
      # to the exception with the address of the specified server.
      #
      # This method is intended to add server address information to exceptions
      # raised during execution of operations on servers.
      def add_server_diagnostics
        yield
      # Note that the exception should already have been mapped to a
      # Mongo::Error subclass when it gets to this method.
      rescue Error::SocketError, Error::SocketTimeoutError => e
        # Server::Monitor::Connection does not reference its server, but
        # knows its address. Server::Connection delegates the address to its
        # server.
        note = +"on #{address.seed}"
        if respond_to?(:id)
          note << ", connection #{generation}:#{id}"
        end
        # Non-monitoring connections have service id.
        # Monitoring connections do not.
        if respond_to?(:service_id) && service_id
          note << ", service id #{service_id}"
        end
        e.add_note(note)
        if respond_to?(:generation)
          # Non-monitoring connections
          e.generation = generation
          if respond_to?(:global_id)
            e.connection_global_id = global_id
          end
          if respond_to?(:description)
            e.service_id = service_id
          end
        end
        raise e
      end

      def ssl_options
        @ssl_options ||= if options[:ssl]
          options.select { |k, v| k.to_s.start_with?('ssl') }
        else
          # Due to the way options are propagated from the client, if we
          # decide that we don't want to use TLS we need to have the :ssl
          # option explicitly set to false or the value provided to the
          # connection might be overwritten by the default inherited from
          # the client.
          {ssl: false}
        end.freeze
      end

      def ensure_connected
        begin
          unless socket
            raise ArgumentError, "Connection #{generation}:#{id} for #{address.seed} is not connected"
          end
          if @error
            raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is perished"
          end
          result = yield socket
          success = true
          result
        ensure
          unless success
            @error = true
          end
        end
      end
    end
  end
end