File: monitor.rb

package info (click to toggle)
ruby-mongo 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,020 kB
  • sloc: ruby: 110,810; makefile: 5
file content (359 lines) | stat: -rw-r--r-- 11,679 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  class Server

    # Responsible for periodically polling a server via hello commands to
    # keep the server's status up to date.
    #
    # Does all work in a background thread so as to not interfere with other
    # operations performed by the driver.
    #
    # @since 2.0.0
    # @api private
    class Monitor
      include Loggable
      extend Forwardable
      include Event::Publisher
      include BackgroundThread

      # The default interval between server status refreshes is 10 seconds.
      #
      # @since 2.0.0
      DEFAULT_HEARTBEAT_INTERVAL = 10.freeze

      # The minimum time between forced server scans. Is
      # minHeartbeatFrequencyMS in the SDAM spec.
      #
      # @since 2.0.0
      MIN_SCAN_INTERVAL = 0.5.freeze

      # The weighting factor (alpha) for calculating the average moving round trip time.
      #
      # @since 2.0.0
      # @deprecated Will be removed in version 3.0.
      RTT_WEIGHT_FACTOR = 0.2.freeze

      # Create the new server monitor.
      #
      # @example Create the server monitor.
      #   Mongo::Server::Monitor.new(address, listeners, monitoring)
      #
      # @note Monitor must never be directly instantiated outside of a Server.
      #
      # @param [ Server ] server The server to monitor.
      # @param [ Event::Listeners ] event_listeners The event listeners.
      # @param [ Monitoring ] monitoring The monitoring..
      # @param [ Hash ] options The options.
      #
      # @option options [ Float ] :connect_timeout The timeout, in seconds, to
      #   use when establishing the monitoring connection.
      # @option options [ Float ] :heartbeat_interval The interval between
      #   regular server checks.
      # @option options [ Logger ] :logger A custom logger to use.
      # @option options [ Mongo::Server::Monitor::AppMetadata ] :monitor_app_metadata
      #   The metadata to use for regular monitoring connection.
      # @option options [ Mongo::Server::Monitor::AppMetadata ] :push_monitor_app_metadata
      #   The metadata to use for push monitor's connection.
      # @option options [ Float ] :socket_timeout The timeout, in seconds, to
      #   execute operations on the monitoring connection.
      #
      # @since 2.0.0
      # @api private
      def initialize(server, event_listeners, monitoring, options = {})
        unless monitoring.is_a?(Monitoring)
          raise ArgumentError, "Wrong monitoring type: #{monitoring.inspect}"
        end
        unless options[:app_metadata]
          raise ArgumentError, 'App metadata is required'
        end
        unless options[:push_monitor_app_metadata]
          raise ArgumentError, 'Push monitor app metadata is required'
        end
        @server = server
        @event_listeners = event_listeners
        @monitoring = monitoring
        @options = options.freeze
        @mutex = Mutex.new
        @sdam_mutex = Mutex.new
        @next_earliest_scan = @next_wanted_scan = Time.now
        @update_mutex = Mutex.new
      end

      # @return [ Server ] server The server that this monitor is monitoring.
      # @api private
      attr_reader :server

      # @return [ Mongo::Server::Monitor::Connection ] connection The connection to use.
      attr_reader :connection

      # @return [ Hash ] options The server options.
      attr_reader :options

      # The interval between regular server checks.
      #
      # @return [ Float ] The heartbeat interval, in seconds.
      def heartbeat_interval
        options[:heartbeat_interval] || DEFAULT_HEARTBEAT_INTERVAL
      end

      # @deprecated
      def_delegators :server, :last_scan

      # The compressor is determined during the handshake, so it must be an
      # attribute of the connection.
      #
      # @deprecated
      def_delegators :connection, :compressor

      # @return [ Monitoring ] monitoring The monitoring.
      attr_reader :monitoring

      # @return [ Server::PushMonitor | nil ] The push monitor, if one is being
      #   used.
      def push_monitor
        @update_mutex.synchronize do
          @push_monitor
        end
      end

      # Perform a check of the server.
      #
      # @since 2.0.0
      def do_work
        scan!
        # @next_wanted_scan may be updated by the push monitor.
        # However we need to check for termination flag so that the monitor
        # thread exits when requested.
        loop do
          delta = @next_wanted_scan - Time.now
          if delta > 0
            signaled = server.scan_semaphore.wait(delta)
            if signaled || @stop_requested
              break
            end
          else
            break
          end
        end
      end

      # Stop the background thread and wait for it to terminate for a
      # reasonable amount of time.
      #
      # @return [ true | false ] Whether the thread was terminated.
      #
      # @api public for backwards compatibility only
      def stop!
        stop_push_monitor!

        # Forward super's return value
        super.tap do
          # Important: disconnect should happen after the background thread
          # terminates.
          connection&.disconnect!
        end
      end

      def create_push_monitor!(topology_version)
        @update_mutex.synchronize do
          if @push_monitor && !@push_monitor.running?
            @push_monitor = nil
          end

          @push_monitor ||= PushMonitor.new(
            self,
            topology_version,
            monitoring,
            **Utils.shallow_symbolize_keys(options.merge(
              socket_timeout: heartbeat_interval + connection.socket_timeout,
              app_metadata: options[:push_monitor_app_metadata],
              check_document: @connection.check_document
            )),
          )
        end
      end

      def stop_push_monitor!
        @update_mutex.synchronize do
          if @push_monitor
            @push_monitor.stop!
            @push_monitor = nil
          end
        end
      end

      # Perform a check of the server with throttling, and update
      # the server's description and average round trip time.
      #
      # If the server was checked less than MIN_SCAN_INTERVAL seconds
      # ago, sleep until MIN_SCAN_INTERVAL seconds have passed since the last
      # check. Then perform the check which involves running hello
      # on the server being monitored and updating the server description
      # as a result.
      #
      # @note If the system clock moves backwards, this method can sleep
      #   for a very long time.
      #
      # @note The return value of this method is deprecated. In version 3.0.0
      #   this method will not have a return value.
      #
      # @return [ Description ] The updated description.
      #
      # @since 2.0.0
      def scan!
        # Ordinarily the background thread would invoke this method.
        # But it is also possible to invoke scan! directly on a monitor.
        # Allow only one scan to be performed at a time.
        @mutex.synchronize do
          throttle_scan_frequency!

          begin
            result = do_scan
          rescue => e
            run_sdam_flow({}, scan_error: e)
          else
            run_sdam_flow(result)
          end
        end
      end

      def run_sdam_flow(result, awaited: false, scan_error: nil)
        @sdam_mutex.synchronize do
          old_description = server.description

          new_description = Description.new(
            server.address,
            result,
            average_round_trip_time: server.round_trip_time_calculator.average_round_trip_time,
            minimum_round_trip_time: server.round_trip_time_calculator.minimum_round_trip_time
          )

          server.cluster.run_sdam_flow(server.description, new_description, awaited: awaited, scan_error: scan_error)

          server.description.tap do |new_description|
            unless awaited
              if new_description.unknown? && !old_description.unknown?
                @next_earliest_scan = @next_wanted_scan = Time.now
              else
                @next_earliest_scan = Time.now + MIN_SCAN_INTERVAL
                @next_wanted_scan = Time.now + heartbeat_interval
              end
            end
          end
        end
      end

      # Restarts the server monitor unless the current thread is alive.
      #
      # @example Restart the monitor.
      #   monitor.restart!
      #
      # @return [ Thread ] The thread the monitor runs on.
      #
      # @since 2.1.0
      def restart!
        if @thread && @thread.alive?
          @thread
        else
          run!
        end
      end

      def to_s
        "#<#{self.class.name}:#{object_id} #{server.address}>"
      end

      private

      def pre_stop
        server.scan_semaphore.signal
      end

      def do_scan
        begin
          monitoring.publish_heartbeat(server) do
            check
          end
        rescue => exc
          msg = "Error checking #{server.address}"
          Utils.warn_bg_exception(msg, exc,
            logger: options[:logger],
            log_prefix: options[:log_prefix],
            bg_error_backtrace: options[:bg_error_backtrace],
          )
          raise exc
        end
      end

      def check
        if @connection && @connection.pid != Process.pid
          log_warn("Detected PID change - Mongo client should have been reconnected (old pid #{@connection.pid}, new pid #{Process.pid}")
          @connection.disconnect!
          @connection = nil
        end

        if @connection
          result = server.round_trip_time_calculator.measure do
            begin
              doc = @connection.check_document
              cmd = Protocol::Query.new(
                Database::ADMIN, Database::COMMAND, doc, :limit => -1
              )
              message = @connection.dispatch_bytes(cmd.serialize.to_s)
              message.documents.first
            rescue Mongo::Error
              @connection.disconnect!
              @connection = nil
              raise
            end
          end
        else
          connection = Connection.new(server.address, options)
          connection.connect!
          result = server.round_trip_time_calculator.measure do
            connection.handshake!
          end
          @connection = connection
          if tv_doc = result['topologyVersion']
            # Successful response, server 4.4+
            create_push_monitor!(TopologyVersion.new(tv_doc))
            push_monitor.run!
          else
            # Failed response or pre-4.4 server
            stop_push_monitor!
          end
          result
        end
        result
      end

      # @note If the system clock is set to a time in the past, this method
      #   can sleep for a very long time.
      def throttle_scan_frequency!
        delta = @next_earliest_scan - Time.now
        if delta > 0
          sleep(delta)
        end
      end
    end
  end
end

require 'mongo/server/monitor/connection'
require 'mongo/server/monitor/app_metadata'