File: connection.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (428 lines) | stat: -rw-r--r-- 13,751 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # This class models the socket connections for servers and their behavior.
    #
    # @since 2.0.0
    class Connection < ConnectionBase
      include Monitoring::Publishable
      include Retryable
      include Id
      extend Forwardable

      # The ping command.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING = { :ping => 1 }.freeze

      # The ping command for an OP_MSG (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG = { :ping => 1, '$db' => Database::ADMIN }.freeze

      # Ping message.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_MESSAGE = Protocol::Query.new(Database::ADMIN, Database::COMMAND, PING, :limit => -1)

      # Ping message as an OP_MSG (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG_MESSAGE = Protocol::Msg.new([], {}, PING_OP_MSG)

      # The ping message as raw bytes.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_BYTES = PING_MESSAGE.serialize.to_s.freeze

      # The ping OP_MSG message as raw bytes (server versions >= 3.6).
      #
      # @since 2.5.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      PING_OP_MSG_BYTES = PING_OP_MSG_MESSAGE.serialize.to_s.freeze

      # Creates a new connection object to the specified target address
      # with the specified options.
      #
      # The constructor does not perform any I/O (and thus does not create
      # sockets, handshakes nor authenticates); call connect! method on the
      # connection object to create the network connection.
      #
      # @api private
      #
      # @example Create the connection.
      #   Connection.new(server)
      #
      # @note Connection must never be directly instantiated outside of a
      #   Server.
      #
      # @param [ Mongo::Server ] server The server the connection is for.
      # @param [ Hash ] options The connection options.
      #
      # @option options :pipe [ IO ] The file descriptor for the read end of the
      #   pipe to listen on during the select system call when reading from the
      #   socket.
      # @option options [ Integer ] :generation The generation of this
      #   connection. The generation should only be specified in this option
      #   when not in load-balancing mode, and it should be the generation
      #   of the connection pool when the connection is created. In
      #   load-balancing mode, the generation is set on the connection
      #   after the handshake completes.
      # @option options [ Hash ] :server_api The requested server API version.
      #   This hash can have the following items:
      #   - *:version* -- string
      #   - *:strict* -- boolean
      #   - *:deprecation_errors* -- boolean
      #
      # @since 2.0.0
      def initialize(server, options = {})
        if server.load_balancer? && options[:generation]
          raise ArgumentError, "Generation cannot be set when server is a load balancer"
        end

        @id = server.next_connection_id
        @global_id = self.class.next_id
        @monitoring = server.monitoring
        @options = options.freeze
        @server = server
        @socket = nil
        @last_checkin = nil
        @auth_mechanism = nil
        @pid = Process.pid
        @pinned = false

        publish_cmap_event(
          Monitoring::Event::Cmap::ConnectionCreated.new(address, id)
        )
      end

      # @return [ Time ] The last time the connection was checked back into a pool.
      #
      # @since 2.5.0
      attr_reader :last_checkin

      # @return [ Integer ] The ID for the connection. This will be unique
      # across connections to the same server object.
      #
      # @since 2.9.0
      attr_reader :id

      # @return [ Integer ] The global ID for the connection. This will be unique
      # across all connections.
      attr_reader :global_id

      # The connection pool from which this connection was created.
      # May be nil.
      #
      # @api private
      def connection_pool
        options[:connection_pool]
      end

      # Whether the connection was connected and was not interrupted, closed,
      # or had an error raised.
      #
      # @return [ true | false ] if the connection was connected.
      def connected?
        !closed? && !error? && !interrupted? && !!@socket
      end

      # Whether the connection was closed.
      #
      # Closed connections should no longer be used. Instead obtain a new
      # connection from the connection pool.
      #
      # @return [ true | false ] Whether connection was closed.
      #
      # @since 2.9.0
      def closed?
        !!@closed
      end

      # Whether the connection was interrupted.
      #
      # Interrupted connections were already removed from the pool and should
      # not be checked back into the pool.
      #
      # @return [ true | false ] Whether connection was interrupted.
      def interrupted?
        !!@interrupted
      end

      # Mark the connection as interrupted.
      def interrupted!
        @interrupted = true
      end

      # @api private
      def error?
        !!@error
      end

      # Whether the connection is used by a transaction or cursor operations.
      #
      # Pinned connections should not be disconnected and removed from a
      # connection pool if they are idle or stale.
      #
      # # @return [ true | false ] Whether connection is pinned.
      #
      # @api private
      def pinned?
        @pinned
      end

      # Mark the connection as pinned.
      #
      # @api private
      def pin
        @pinned = true
      end

      # Mark the connection as not pinned.
      #
      # @api private
      def unpin
        @pinned = false
      end

      # Establishes a network connection to the target address.
      #
      # If the connection is already established, this method does nothing.
      #
      # @example Connect to the host.
      #   connection.connect!
      #
      # @note This method mutates the connection object by setting a socket if
      #   one previously did not exist.
      #
      # @return [ true ] If the connection succeeded.
      #
      # @since 2.0.0
      def connect!(context = nil)
        raise_if_closed!

        unless @socket
          @socket = create_socket(context)
          @description, @compressor = do_connect

          if server.load_balancer?
            if Lint.enabled?
              unless service_id
                raise Error::InternalDriverError, "The connection is to a load balancer and it must have service_id set here, but does not"
              end
            end
            @generation = connection_pool.generation_manager.generation(service_id: service_id)
          end

          publish_cmap_event(
            Monitoring::Event::Cmap::ConnectionReady.new(address, id)
          )

          @close_event_published = false
        end
        true
      end

      # Creates the socket. The method is separate from do_connect, so that
      # pending connections can be closed if they are interrupted during hello.
      #
      #
      # @return [ Socket ] The created socket.
      private def create_socket(context = nil)
        add_server_diagnostics do
          opts = ssl_options.merge(
            connection_address: address,
            connection_generation: generation,
            pipe: options[:pipe],
            connect_timeout: context&.remaining_timeout_sec,
            csot: !!context&.csot?
          )
          address.socket(socket_timeout, opts)
        end
      end

      # Separate method to permit easier mocking in the test suite.
      #
      # @return [ Array<Server::Description, String | Symbol> ] A server
      #   description instance from the hello response of the returned socket
      #   and the compressor to use.
      private def do_connect
        raise_if_closed!
        begin
          pending_connection = PendingConnection.new(
            socket, @server, monitoring, options.merge(id: id))
          pending_connection.handshake_and_authenticate!
        rescue Exception
          socket&.close
          @socket = nil
          raise
        end

        [pending_connection.description, pending_connection.compressor]
      end

      # Disconnect the connection.
      #
      # @note Once a connection is disconnected, it should no longer be used.
      #   A new connection should be obtained from the connection pool which
      #   will either return a ready connection or create a new connection.
      #   If linting is enabled, reusing a disconnected connection will raise
      #   Error::LintError. If linting is not enabled, a warning will be logged.
      #
      # @note This method mutates the connection object by setting the socket
      #   to nil if the closing succeeded.
      #
      # @option options [ Symbol ] :reason The reason why the connection is
      #   being closed.
      # @option options [ true | false ] :interrupted Whether or not the
      #   connection was interrupted.
      #
      # @return [ true ] If the disconnect succeeded.
      #
      # @since 2.0.0
      def disconnect!(options = nil)
        # Note: @closed may be true here but we also may have a socket.
        # Check the socket and not @closed flag.
        @auth_mechanism = nil
        @last_checkin = nil
        if socket
          socket.close rescue nil
          @socket = nil
        end
        @closed = true
        interrupted! if options && options[:interrupted]

        # To satisfy CMAP spec tests, publish close events even if the
        # socket was never connected (and thus the ready event was never
        # published). But track whether we published close event and do not
        # publish it multiple times, unless the socket was reconnected -
        # in that case publish the close event once per socket close.
        unless @close_event_published
          reason = options && options[:reason]
          publish_cmap_event(
            Monitoring::Event::Cmap::ConnectionClosed.new(
              address,
              id,
              reason,
            ),
          )
          @close_event_published = true
        end

        true
      end

      # Ping the connection to see if the server is responding to commands.
      # This is non-blocking on the server side.
      #
      # @example Ping the connection.
      #   connection.ping
      #
      # @note This uses a pre-serialized ping message for optimization.
      #
      # @return [ true, false ] If the server is accepting connections.
      #
      # @since 2.1.0
      #
      # @deprecated No longer necessary with Server Selection specification.
      def ping
        bytes = features.op_msg_enabled? ? PING_OP_MSG_BYTES : PING_BYTES
        ensure_connected do |socket|
          reply = add_server_diagnostics do
            socket.write(bytes)
            Protocol::Message.deserialize(socket, max_message_size)
          end
          reply.documents[0][Operation::Result::OK] == 1
        end
      end

      # Get the timeout to execute an operation on a socket.
      #
      # @return [ Float ] The operation timeout in seconds.
      #
      # @since 2.0.0
      def socket_timeout
        @timeout ||= options[:socket_timeout]
      end
      # @deprecated Please use :socket_timeout instead. Will be removed in 3.0.0
      alias :timeout :socket_timeout

      # Record the last checkin time.
      #
      # @example Record the checkin time on this connection.
      #   connection.record_checkin!
      #
      # @return [ self ]
      #
      # @since 2.5.0
      def record_checkin!
        @last_checkin = Time.now
        self
      end

      private

      def deliver(message, client, options = {})
        handle_errors do
          super
        end
      end

      def handle_errors
        begin
          yield
        rescue Error::SocketError => e
          @error = e
          @server.unknown!(
            generation: e.generation,
            # or description.service_id?
            service_id: e.service_id,
            stop_push_monitor: true,
          )
          raise
        rescue Error::SocketTimeoutError => e
          @error = e
          raise
        end
      end

      def raise_if_closed!
        if error?
          raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is perished. Reconnecting closed or errored connections is no longer supported"
        end

        if closed?
          raise Error::ConnectionPerished, "Connection #{generation}:#{id} for #{address.seed} is closed. Reconnecting closed or errored connections is no longer supported"
        end
      end
    end
  end
end