File: monitoring.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (379 lines) | stat: -rw-r--r-- 12,590 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo

  # The class defines behavior for the performance monitoring API.
  #
  # @since 2.1.0
  class Monitoring
    include Id

    # The command topic.
    #
    # @since 2.1.0
    COMMAND = 'Command'.freeze

    # The connection pool topic.
    #
    # @since 2.9.0
    CONNECTION_POOL = 'ConnectionPool'.freeze

    # Server closed topic.
    #
    # @since 2.4.0
    SERVER_CLOSED = 'ServerClosed'.freeze

    # Server description changed topic.
    #
    # @since 2.4.0
    SERVER_DESCRIPTION_CHANGED = 'ServerDescriptionChanged'.freeze

    # Server opening topic.
    #
    # @since 2.4.0
    SERVER_OPENING = 'ServerOpening'.freeze

    # Topology changed topic.
    #
    # @since 2.4.0
    TOPOLOGY_CHANGED = 'TopologyChanged'.freeze

    # Topology closed topic.
    #
    # @since 2.4.0
    TOPOLOGY_CLOSED = 'TopologyClosed'.freeze

    # Topology opening topic.
    #
    # @since 2.4.0
    TOPOLOGY_OPENING = 'TopologyOpening'.freeze

    # Server heartbeat started topic.
    #
    # @since 2.7.0
    SERVER_HEARTBEAT = 'ServerHeartbeat'.freeze

    # Used for generating unique operation ids to link events together.
    #
    # @example Get the next operation id.
    #   Monitoring.next_operation_id
    #
    # @return [ Integer ] The next operation id.
    #
    # @since 2.1.0
    def self.next_operation_id
      self.next_id
    end

    # Contains subscription methods common between monitoring and
    # global event subscriptions.
    #
    # @since 2.6.0
    module Subscribable
      # Subscribe a listener to an event topic.
      #
      # @note It is possible to subscribe the same listener to the same topic
      # multiple times, in which case the listener will be invoked as many
      # times as it is subscribed and to unsubscribe it the same number
      # of unsubscribe calls will be needed.
      #
      # @example Subscribe to the topic.
      #   monitoring.subscribe(QUERY, subscriber)
      #
      # @example Subscribe to the topic globally.
      #   Monitoring::Global.subscribe(QUERY, subscriber)
      #
      # @param [ String ] topic The event topic.
      # @param [ Object ] subscriber The subscriber to handle the event.
      #
      # @since 2.1.0
      def subscribe(topic, subscriber)
        subscribers_for(topic).push(subscriber)
      end

      # Unsubscribe a listener from an event topic.
      #
      # If the listener was subscribed to the event topic multiple times,
      # this call removes a single subscription.
      #
      # If the listener was not subscribed to the topic, this operation
      # is a no-op and no exceptions are raised.
      #
      # @note Global subscriber registry is separate from per-client
      #   subscriber registry. The same subscriber can be subscribed to
      #   events from a particular client as well as globally; unsubscribing
      #   globally will not unsubscribe that subscriber from the client
      #   it was explicitly subscribed to.
      #
      # @note Currently the list of global subscribers is copied into
      #   a client whenever the client is created. Thus unsubscribing a
      #   subscriber globally has no effect for existing clients - they will
      #   continue sending events to the unsubscribed subscriber.
      #
      # @example Unsubscribe from the topic.
      #   monitoring.unsubscribe(QUERY, subscriber)
      #
      # @example Unsubscribe from the topic globally.
      #   Mongo::Monitoring::Global.unsubscribe(QUERY, subscriber)
      #
      # @param [ String ] topic The event topic.
      # @param [ Object ] subscriber The subscriber to be unsubscribed.
      #
      # @since 2.6.0
      def unsubscribe(topic, subscriber)
        subs = subscribers_for(topic)
        index = subs.index(subscriber)
        if index
          subs.delete_at(index)
        end
      end

      # Get all the subscribers.
      #
      # @example Get all the subscribers.
      #   monitoring.subscribers
      #
      # @example Get all the global subscribers.
      #   Mongo::Monitoring::Global.subscribers
      #
      # @return [ Hash<String, Object> ] The subscribers.
      #
      # @since 2.1.0
      def subscribers
        @subscribers ||= {}
      end

      # Determine if there are any subscribers for a particular event.
      #
      # @example Are there subscribers?
      #   monitoring.subscribers?(COMMAND)
      #
      # @example Are there global subscribers?
      #   Mongo::Monitoring::Global.subscribers?(COMMAND)
      #
      # @param [ String ] topic The event topic.
      #
      # @return [ true, false ] If there are subscribers for the topic.
      #
      # @since 2.1.0
      def subscribers?(topic)
        !subscribers_for(topic).empty?
      end

      private

      def subscribers_for(topic)
        subscribers[topic] ||= []
      end
    end

    # Allows subscribing to events for all Mongo clients.
    #
    # @note Global subscriptions must be established prior to creating
    #   clients. When a client is constructed it copies subscribers from
    #   the Global module; subsequent subscriptions or unsubscriptions
    #   on the Global module have no effect on already created clients.
    #
    # @since 2.1.0
    module Global
      extend Subscribable
    end

    include Subscribable

    # Initialize the monitoring.
    #
    # @example Create the new monitoring.
    #   Monitoring.new(:monitoring => true)
    #
    # @param [ Hash ] options Options. Client constructor forwards its
    #   options to Monitoring constructor, although Monitoring recognizes
    #   only a subset of the options recognized by Client.
    # @option options [ true, false ] :monitoring If false is given, the
    #   Monitoring instance is initialized without global monitoring event
    #   subscribers and will not publish SDAM events. Command monitoring events
    #   will still be published, and the driver will still perform SDAM and
    #   monitor its cluster in order to perform server selection. Built-in
    #   driver logging of SDAM events will be disabled because it is
    #   implemented through SDAM event subscription. Client#subscribe will
    #   succeed for all event types, but subscribers to SDAM events will
    #   not be invoked. Values other than false result in default behavior
    #   which is to perform normal SDAM event publication.
    #
    # @since 2.1.0
    # @api private
    def initialize(options = {})
      @options = options
      if options[:monitoring] != false
        Global.subscribers.each do |topic, subscribers|
          subscribers.each do |subscriber|
            subscribe(topic, subscriber)
          end
        end
        subscribe(COMMAND, CommandLogSubscriber.new(options))
        # CMAP events are not logged by default because this will create
        # log entries for every operation performed by the driver.
        #subscribe(CONNECTION_POOL, CmapLogSubscriber.new(options))
        subscribe(SERVER_OPENING, ServerOpeningLogSubscriber.new(options))
        subscribe(SERVER_CLOSED, ServerClosedLogSubscriber.new(options))
        subscribe(SERVER_DESCRIPTION_CHANGED, ServerDescriptionChangedLogSubscriber.new(options))
        subscribe(TOPOLOGY_OPENING, TopologyOpeningLogSubscriber.new(options))
        subscribe(TOPOLOGY_CHANGED, TopologyChangedLogSubscriber.new(options))
        subscribe(TOPOLOGY_CLOSED, TopologyClosedLogSubscriber.new(options))
      end
    end

    # @api private
    attr_reader :options

    # @api private
    def monitoring?
      options[:monitoring] != false
    end

    # Publish an event.
    #
    # This method is used for event types which only have a single event
    # in them.
    #
    # @param [ String ] topic The event topic.
    # @param [ Event ] event The event to publish.
    #
    # @since 2.9.0
    def published(topic, event)
      subscribers_for(topic).each{ |subscriber| subscriber.published(event) }
    end

    # Publish a started event.
    #
    # This method is used for event types which have the started/succeeded/failed
    # events in them, such as command and heartbeat events.
    #
    # @example Publish a started event.
    #   monitoring.started(COMMAND, event)
    #
    # @param [ String ] topic The event topic.
    # @param [ Event ] event The event to publish.
    #
    # @since 2.1.0
    def started(topic, event)
      subscribers_for(topic).each{ |subscriber| subscriber.started(event) }
    end

    # Publish a succeeded event.
    #
    # This method is used for event types which have the started/succeeded/failed
    # events in them, such as command and heartbeat events.
    #
    # @example Publish a succeeded event.
    #   monitoring.succeeded(COMMAND, event)
    #
    # @param [ String ] topic The event topic.
    # @param [ Event ] event The event to publish.
    #
    # @since 2.1.0
    def succeeded(topic, event)
      subscribers_for(topic).each{ |subscriber| subscriber.succeeded(event) }
    end

    # Publish a failed event.
    #
    # This method is used for event types which have the started/succeeded/failed
    # events in them, such as command and heartbeat events.
    #
    # @example Publish a failed event.
    #   monitoring.failed(COMMAND, event)
    #
    # @param [ String ] topic The event topic.
    # @param [ Event ] event The event to publish.
    #
    # @since 2.1.0
    def failed(topic, event)
      subscribers_for(topic).each{ |subscriber| subscriber.failed(event) }
    end

    # @api private
    def publish_heartbeat(server, awaited: false)
      if monitoring?
        started_event = Event::ServerHeartbeatStarted.new(
          server.address, awaited: awaited)
        started(SERVER_HEARTBEAT, started_event)
      end

      # The duration we publish in heartbeat succeeded/failed events is
      # the time spent on the entire heartbeat. This could include time
      # to connect the socket (including TLS handshake), not just time
      # spent on hello call itself.
      # The spec at https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-logging-and-monitoring.md
      # requires that the duration exposed here start from "sending the
      # message" (hello). This requirement does not make sense if,
      # for example, we were never able to connect to the server at all
      # and thus hello was never sent.
      start_time = Utils.monotonic_time

      begin
        result = yield
      rescue => exc
        if monitoring?
          event = Event::ServerHeartbeatFailed.new(
            server.address,
            Utils.monotonic_time - start_time,
            exc,
            awaited: awaited,
            started_event: started_event,
          )
          failed(SERVER_HEARTBEAT, event)
        end
        raise
      else
        if monitoring?
          event = Event::ServerHeartbeatSucceeded.new(
            server.address,
            Utils.monotonic_time - start_time,
            awaited: awaited,
            started_event: started_event,
          )
          succeeded(SERVER_HEARTBEAT, event)
        end
        result
      end
    end

    private

    def initialize_copy(original)
      @subscribers = {}
      original.subscribers.each do |k, v|
        @subscribers[k] = v.dup
      end
    end
  end
end

require 'mongo/monitoring/event'
require 'mongo/monitoring/publishable'
require 'mongo/monitoring/command_log_subscriber'
require 'mongo/monitoring/cmap_log_subscriber'
require 'mongo/monitoring/sdam_log_subscriber'
require 'mongo/monitoring/server_description_changed_log_subscriber'
require 'mongo/monitoring/server_closed_log_subscriber'
require 'mongo/monitoring/server_opening_log_subscriber'
require 'mongo/monitoring/topology_changed_log_subscriber'
require 'mongo/monitoring/topology_opening_log_subscriber'
require 'mongo/monitoring/topology_closed_log_subscriber'
require 'mongo/monitoring/unified_sdam_log_subscriber'