File: base.rb

package info (click to toggle)
ruby-mongo 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,020 kB
  • sloc: ruby: 110,810; makefile: 5
file content (714 lines) | stat: -rw-r--r-- 27,686 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo

  module ServerSelector

    class Base

      # Initialize the server selector.
      #
      # @example Initialize the selector.
      #   Mongo::ServerSelector::Secondary.new(:tag_sets => [{'dc' => 'nyc'}])
      #
      # @example Initialize the preference with no options.
      #   Mongo::ServerSelector::Secondary.new
      #
      # @param [ Hash ] options The server preference options.
      #
      # @option options [ Integer ] :local_threshold The local threshold boundary for
      #  nearest selection in seconds.
      # @option options [ Integer ] :max_staleness The maximum replication lag,
      #   in seconds, that a secondary can suffer and still be eligible for a read.
      #   A value of -1 is treated identically to nil, which is to not
      #   have a maximum staleness.
      # @option options [ Hash | nil ] :hedge A Hash specifying whether to enable hedged
      #   reads on the server. Hedged reads are not enabled by default. When
      #   specifying this option, it must be in the format: { enabled: true },
      #   where the value of the :enabled key is a boolean value.
      #
      # @raise [ Error::InvalidServerPreference ] If tag sets are specified
      #   but not allowed.
      #
      # @api private
      def initialize(options = nil)
        options = options ? options.dup : {}
        if options[:max_staleness] == -1
          options.delete(:max_staleness)
        end
        @options = options
        @tag_sets = options[:tag_sets] || []
        @max_staleness = options[:max_staleness]
        @hedge = options[:hedge]

        validate!
      end

      # @return [ Hash ] options The options.
      attr_reader :options

      # @return [ Array ] tag_sets The tag sets used to select servers.
      attr_reader :tag_sets

      # @return [ Integer ] max_staleness The maximum replication lag, in
      #   seconds, that a secondary can suffer and still be eligible for a read.
      #
      # @since 2.4.0
      attr_reader :max_staleness

      # @return [ Hash | nil ] hedge The document specifying whether to enable
      #   hedged reads.
      attr_reader :hedge

      # Get the timeout for server selection.
      #
      # @example Get the server selection timeout, in seconds.
      #   selector.server_selection_timeout
      #
      # @return [ Float ] The timeout.
      #
      # @since 2.0.0
      #
      # @deprecated This setting is now taken from the cluster options when
      #   a server is selected. Will be removed in version 3.0.
      def server_selection_timeout
        @server_selection_timeout ||=
          (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
      end

      # Get the local threshold boundary for nearest selection in seconds.
      #
      # @example Get the local threshold.
      #   selector.local_threshold
      #
      # @return [ Float ] The local threshold.
      #
      # @since 2.0.0
      #
      # @deprecated This setting is now taken from the cluster options when
      #   a server is selected. Will be removed in version 3.0.
      def local_threshold
        @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
      end

      # @api private
      def local_threshold_with_cluster(cluster)
        options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD
      end

      # Inspect the server selector.
      #
      # @example Inspect the server selector.
      #   selector.inspect
      #
      # @return [ String ] The inspection.
      #
      # @since 2.2.0
      def inspect
        "#<#{self.class.name}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect} hedge=#{hedge}>"
      end

      # Check equality of two server selectors.
      #
      # @example Check server selector equality.
      #   preference == other
      #
      # @param [ Object ] other The other preference.
      #
      # @return [ true, false ] Whether the objects are equal.
      #
      # @since 2.0.0
      def ==(other)
        name == other.name && hedge == other.hedge &&
          max_staleness == other.max_staleness && tag_sets == other.tag_sets
      end

      # Select a server from the specified cluster, taking into account
      # mongos pinning for the specified session.
      #
      # If the session is given and has a pinned server, this server is the
      # only server considered for selection. If the server is of type mongos,
      # it is returned immediately; otherwise monitoring checks on this
      # server are initiated to update its status, and if the server becomes
      # a mongos within the server selection timeout, it is returned.
      #
      # If no session is given or the session does not have a pinned server,
      # normal server selection process is performed among all servers in the
      # specified cluster matching the preference of this server selector
      # object. Monitoring checks are initiated on servers in the cluster until
      # a suitable server is found, up to the server selection timeout.
      #
      # If a suitable server is not found within the server selection timeout,
      # this method raises Error::NoServerAvailable.
      #
      # @param [ Mongo::Cluster ] cluster The cluster from which to select
      #   an eligible server.
      # @param [ true, false ] ping Whether to ping the server before selection.
      #   Deprecated and ignored.
      # @param [ Session | nil ] session Optional session to take into account
      #   for mongos pinning. Added in version 2.10.0.
      # @param [ true | false ] write_aggregation Whether we need a server that
      #   supports writing aggregations (e.g. with $merge/$out) on secondaries.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #   be selected from only if no other servers are available. This is
      #   used to avoid selecting the same server twice in a row when
      #   retrying a command.
      # @param [ Float | nil ] :timeout Timeout in seconds for the operation,
      #   if any.
      #
      # @return [ Mongo::Server ] A server matching the server preference.
      #
      # @raise [ Error::NoServerAvailable ] No server was found matching the
      #   specified preference / pinning requirement in the server selection
      #   timeout.
      # @raise [ Error::LintError ] An unexpected condition was detected, and
      #   lint mode is enabled.
      #
      # @since 2.0.0
      def select_server(
        cluster,
        ping = nil,
        session = nil,
        write_aggregation: false,
        deprioritized: [],
        timeout: nil
      )
        select_server_impl(cluster, ping, session, write_aggregation, deprioritized, timeout).tap do |server|
          if Lint.enabled? && !server.pool.ready?
            raise Error::LintError, 'Server selector returning a server with a pool which is not ready'
          end
        end
      end

      # Parameters and return values are the same as for select_server, only
      # the +timeout+ param is renamed to +csot_timeout+.
      private def select_server_impl(cluster, ping, session, write_aggregation, deprioritized, csot_timeout)
        if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
          return cluster.servers.first
        end

        timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT

        server_selection_timeout = if csot_timeout && csot_timeout > 0
                                     [timeout, csot_timeout].min
                                   else
                                     timeout
                                   end

        # Special handling for zero timeout: if we have to select a server,
        # and the timeout is zero, fail immediately (since server selection
        # will take some non-zero amount of time in any case).
        if server_selection_timeout == 0
          msg = "Failing server selection due to zero timeout. " +
            " Requested #{name} in cluster: #{cluster.summary}"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end

        deadline = Utils.monotonic_time + server_selection_timeout

        if session && session.pinned_server
          if Mongo::Lint.enabled?
            unless cluster.sharded?
              raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}"
            end
          end

          if !session.in_transaction?
            session.unpin
          end

          if server = session.pinned_server
            # Here we assume that a mongos stays in the topology indefinitely.
            # This will no longer be the case once SRV polling is implemented.

            unless server.mongos?
              while (time_remaining = deadline - Utils.monotonic_time) > 0
                wait_for_server_selection(cluster, time_remaining)
              end

              unless server.mongos?
                msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " +
                  "(after #{server_selection_timeout} seconds)"
                raise Error::NoServerAvailable.new(self, cluster, msg)
              end
            end

            return server
          end
        end

        if cluster.replica_set?
          validate_max_staleness_value_early!
        end

        if cluster.addresses.empty?
          if Lint.enabled?
            unless cluster.servers.empty?
              raise Error::LintError, "Cluster has no addresses but has servers: #{cluster.servers.map(&:inspect).join(', ')}"
            end
          end
          msg = "Cluster has no addresses, and therefore will never have a server"
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end

=begin Add this check in version 3.0.0
        unless cluster.connected?
          msg = 'Cluster is disconnected'
          raise Error::NoServerAvailable.new(self, cluster, msg)
        end
=end

        loop do
          if Lint.enabled?
            cluster.servers.each do |server|
              # TODO: Add this back in RUBY-3174.
              # if !server.unknown? && !server.connected?
              #   raise Error::LintError, "Server #{server.summary} is known but is not connected"
              # end
              if !server.unknown? && !server.pool.ready?
                raise Error::LintError, "Server #{server.summary} is known but has non-ready pool"
              end
            end
          end

          server = try_select_server(cluster, write_aggregation: write_aggregation, deprioritized: deprioritized)

          if server
            unless cluster.topology.compatible?
              raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
            end

            if session && session.starting_transaction? && cluster.sharded?
              session.pin_to_server(server)
            end

            return server
          end

          cluster.scan!(false)

          time_remaining = deadline - Utils.monotonic_time
          if time_remaining > 0
            wait_for_server_selection(cluster, time_remaining)

            # If we wait for server selection, perform another round of
            # attempting to locate a suitable server. Otherwise server selection
            # can raise NoServerAvailable message when the diagnostics
            # reports an available server of the requested type.
          else
            break
          end
        end

        msg = "No #{name} server"
        if is_a?(ServerSelector::Secondary) && !tag_sets.empty?
          msg += " with tag sets: #{tag_sets}"
        end
        msg += " is available in cluster: #{cluster.summary} " +
                "with timeout=#{server_selection_timeout}, " +
                "LT=#{local_threshold_with_cluster(cluster)}"
        msg += server_selection_diagnostic_message(cluster)
        raise Error::NoServerAvailable.new(self, cluster, msg)
      rescue Error::NoServerAvailable => e
        if session && session.in_transaction? && !session.committing_transaction?
          e.add_label('TransientTransactionError')
        end
        if session && session.committing_transaction?
          e.add_label('UnknownTransactionCommitResult')
        end
        raise e
      end

      # Tries to find a suitable server, returns the server if one is available
      # or nil if there isn't a suitable server.
      #
      # @param [ Mongo::Cluster ] cluster The cluster from which to select
      #   an eligible server.
      # @param [ true | false ] write_aggregation Whether we need a server that
      #   supports writing aggregations (e.g. with $merge/$out) on secondaries.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #   be selected from only if no other servers are available. This is
      #   used to avoid selecting the same server twice in a row when
      #   retrying a command.
      #
      # @return [ Server | nil ] A suitable server, if one exists.
      #
      # @api private
      def try_select_server(cluster, write_aggregation: false, deprioritized: [])
        servers = if write_aggregation && cluster.replica_set?
          # 1. Check if ALL servers in cluster support secondary writes.
          is_write_supported = cluster.servers.reduce(true) do |res, server|
            res && server.features.merge_out_on_secondary_enabled?
          end

          if is_write_supported
            # 2. If all servers support secondary writes, we respect read preference.
            suitable_servers(cluster)
          else
            # 3. Otherwise we fallback to primary for replica set.
            [cluster.servers.detect(&:primary?)]
          end
        else
          suitable_servers(cluster)
        end

        # This list of servers may be ordered in a specific way
        # by the selector (e.g. for secondary preferred, the first
        # server may be a secondary and the second server may be primary)
        # and we should take the first server here respecting the order
        server = suitable_server(servers, deprioritized)

        if server
          if Lint.enabled?
            # It is possible for a server to have a nil average RTT here
            # because the ARTT comes from description which may be updated
            # by a background thread while server selection is running.
            # Currently lint mode is not a public feature, if/when this
            # changes (https://jira.mongodb.org/browse/RUBY-1576) the
            # requirement for ARTT to be not nil would need to be removed.
            if server.average_round_trip_time.nil?
              raise Error::LintError, "Server #{server.address} has nil average rtt"
            end
          end
        end

        server
      end

      # Returns servers of acceptable types from the cluster.
      #
      # Does not perform staleness validation, staleness filtering or
      # latency filtering.
      #
      # @param [ Cluster ] cluster The cluster.
      #
      # @return [ Array<Server> ] The candidate servers.
      #
      # @api private
      def candidates(cluster)
        servers = cluster.servers
        servers.each do |server|
          validate_max_staleness_support!(server)
        end
        if cluster.single?
          servers
        elsif cluster.sharded?
          servers
        elsif cluster.replica_set?
          select_in_replica_set(servers)
        else
          # Unknown cluster - no servers
          []
        end
      end

      # Returns servers satisfying the server selector from the cluster.
      #
      # @param [ Cluster ] cluster The cluster.
      #
      # @return [ Array<Server> ] The suitable servers.
      #
      # @api private
      def suitable_servers(cluster)
        if cluster.single?
          candidates(cluster)
        elsif cluster.sharded?
          local_threshold = local_threshold_with_cluster(cluster)
          servers = candidates(cluster)
          near_servers(servers, local_threshold)
        elsif cluster.replica_set?
          validate_max_staleness_value!(cluster)
          candidates(cluster)
        else
          # Unknown cluster - no servers
          []
        end
      end

      private

      # Returns a server from the list of servers that is suitable for
      # executing the operation.
      #
      # @param [ Array<Server> ] servers The candidate servers.
      # @param [ Array<Server> ] deprioritized A list of servers that should
      #  be selected from only if no other servers are available.
      #
      # @return [ Server | nil ] The suitable server or nil if no suitable
      #  server is available.
      def suitable_server(servers, deprioritized)
        preferred = servers - deprioritized
        if preferred.empty?
          servers.first
        else
          preferred.first
        end
      end

      # Convert this server preference definition into a format appropriate
      #   for sending to a MongoDB server (i.e., as a command field).
      #
      # @return [ Hash ] The server preference formatted as a command field value.
      #
      # @since 2.0.0
      def full_doc
        @full_doc ||= begin
          preference = { :mode => self.class.const_get(:SERVER_FORMATTED_NAME) }
          preference.update(tags: tag_sets) unless tag_sets.empty?
          preference.update(maxStalenessSeconds: max_staleness) if max_staleness
          preference.update(hedge: hedge) if hedge
          preference
        end
      end

      # Select the primary from a list of provided candidates.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   primary from.
      #
      # @return [ Array ] The primary.
      #
      # @since 2.0.0
      def primary(candidates)
        candidates.select do |server|
          server.primary?
        end
      end

      # Select the secondaries from a list of provided candidates.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   secondaries from.
      #
      # @return [ Array ] The secondary servers.
      #
      # @since 2.0.0
      def secondaries(candidates)
        matching_servers = candidates.select(&:secondary?)
        matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
        matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
        # Per server selection spec the server selected MUST be a random
        # one matching staleness and latency requirements.
        # Selectors always pass the output of #secondaries to #nearest
        # which shuffles the server list, fulfilling this requirement.
        matching_servers
      end

      # Select the near servers from a list of provided candidates, taking the
      #   local threshold into account.
      #
      # @param [ Array ] candidates List of candidate servers to select the
      #   near servers from.
      # @param [ Integer ] local_threshold Local threshold. This parameter
      #   will be required in driver version 3.0.
      #
      # @return [ Array ] The near servers.
      #
      # @since 2.0.0
      def near_servers(candidates = [], local_threshold = nil)
        return candidates if candidates.empty?

        # Average RTT on any server may change at any time by the server
        # monitor's background thread. ARTT may also become nil if the
        # server is marked unknown. Take a snapshot of ARTTs for the duration
        # of this method.

        candidates = candidates.map do |server|
          {server: server, artt: server.average_round_trip_time}
        end.reject do |candidate|
          candidate[:artt].nil?
        end

        return candidates if candidates.empty?

        nearest_candidate = candidates.min_by do |candidate|
          candidate[:artt]
        end

        # Default for legacy signarure
        local_threshold ||= self.local_threshold

        threshold = nearest_candidate[:artt] + local_threshold

        candidates.select do |candidate|
          candidate[:artt] <= threshold
        end.map do |candidate|
          candidate[:server]
        end.shuffle!
      end

      # Select the servers matching the defined tag sets.
      #
      # @param [ Array ] candidates List of candidate servers from which those
      #   matching the defined tag sets should be selected.
      #
      # @return [ Array ] The servers matching the defined tag sets.
      #
      # @since 2.0.0
      def match_tag_sets(candidates)
        matches = []
        tag_sets.find do |tag_set|
          matches = candidates.select { |server| server.matches_tag_set?(tag_set) }
          !matches.empty?
        end
        matches || []
      end

      def filter_stale_servers(candidates, primary = nil)
        return candidates unless @max_staleness

        # last_scan is filled out by the Monitor, and can be nil if a server
        # had its description manually set rather than being normally updated
        # via the SDAM flow. We don't handle the possibility of a nil
        # last_scan here.
        if primary
          candidates.select do |server|
            validate_max_staleness_support!(server)
            staleness = (server.last_scan - server.last_write_date) -
                        (primary.last_scan - primary.last_write_date)  +
                        server.cluster.heartbeat_interval
            staleness <= @max_staleness
          end
        else
          max_write_date = candidates.collect(&:last_write_date).max
          candidates.select do |server|
            validate_max_staleness_support!(server)
            staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval
            staleness <= @max_staleness
          end
        end
      end

      def validate!
        if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_TAG_SUPPORT)
        elsif @max_staleness && !max_staleness_allowed?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_SUPPORT)
        end

        if @hedge
          unless hedge_allowed?
            raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_HEDGE_SUPPORT)
          end

          unless @hedge.is_a?(Hash) && @hedge.key?(:enabled) &&
              [true, false].include?(@hedge[:enabled])
            raise Error::InvalidServerPreference.new(
              "`hedge` value (#{hedge}) is invalid - hedge must be a Hash in the " \
              "format { enabled: true }"
            )
          end
        end
      end

      def validate_max_staleness_support!(server)
        if @max_staleness && !server.features.max_staleness_enabled?
          raise Error::InvalidServerPreference.new(Error::InvalidServerPreference::NO_MAX_STALENESS_WITH_LEGACY_SERVER)
        end
      end

      def validate_max_staleness_value_early!
        if @max_staleness
          unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
            msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
              "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS})"
            raise Error::InvalidServerPreference.new(msg)
          end
        end
      end

      def validate_max_staleness_value!(cluster)
        if @max_staleness
          heartbeat_interval = cluster.heartbeat_interval
          unless @max_staleness >= [
            SMALLEST_MAX_STALENESS_SECONDS,
            min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS,
          ].max
            msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
              "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
              "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
            raise Error::InvalidServerPreference.new(msg)
          end
        end
      end

      # Waits for server state changes in the specified cluster.
      #
      # If the cluster has a server selection semaphore, waits on that
      # semaphore up to the specified remaining time. Any change in server
      # state resulting from SDAM will immediately wake up this method and
      # cause it to return.
      #
      # If the cluster does not have a server selection semaphore, waits
      # the smaller of 0.25 seconds and the specified remaining time.
      # This functionality is provided for backwards compatibility only for
      # applications directly invoking the server selection process.
      # If lint mode is enabled and the cluster does not have a server
      # selection semaphore, Error::LintError will be raised.
      #
      # @param [ Cluster ] cluster The cluster to wait for.
      # @param [ Numeric ] time_remaining Maximum time to wait, in seconds.
      def wait_for_server_selection(cluster, time_remaining)
        if cluster.server_selection_semaphore
          # Since the semaphore may have been signaled between us checking
          # the servers list earlier and the wait call below, we should not
          # wait for the full remaining time - wait for up to 0.5 second, then
          # recheck the state.
          cluster.server_selection_semaphore.wait([time_remaining, 0.5].min)
        else
          if Lint.enabled?
            raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
          end
          sleep [time_remaining, 0.25].min
        end
      end

      # Creates a diagnostic message when server selection fails.
      #
      # The diagnostic message includes the following information, as applicable:
      #
      # - Servers having dead monitor threads
      # - Cluster is disconnected
      #
      # If none of the conditions for diagnostic messages apply, an empty string
      # is returned.
      #
      # @param [ Cluster ] cluster The cluster on which server selection was
      #   performed.
      #
      # @return [ String ] The diagnostic message.
      def server_selection_diagnostic_message(cluster)
        msg = ''
        dead_monitors = []
        cluster.servers_list.each do |server|
          thread = server.monitor.instance_variable_get('@thread')
          if thread.nil? || !thread.alive?
            dead_monitors << server
          end
        end
        if dead_monitors.any?
          msg += ". The following servers have dead monitor threads: #{dead_monitors.map(&:summary).join(', ')}"
        end
        unless cluster.connected?
          msg += ". The cluster is disconnected (client may have been closed)"
        end
        msg
      end
    end
  end
end