File: server_selection.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (365 lines) | stat: -rw-r--r-- 13,232 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
# frozen_string_literal: true
# rubocop:todo all

module Mongo
  module ServerSelection
    module Read

      # Represents a Server Selection specification test.
      #
      # @since 2.0.0
      class Spec

        # Mapping of read preference modes.
        #
        # @since 2.0.0
        READ_PREFERENCES = {
          'Primary' => :primary,
          'Secondary' => :secondary,
          'PrimaryPreferred' => :primary_preferred,
          'SecondaryPreferred' => :secondary_preferred,
          'Nearest' => :nearest,
        }

        # @return [ String ] description The spec description.
        #
        # @since 2.0.0
        attr_reader :description

        # @return [ Hash ] read_preference The read preference to be used for selection.
        #
        # @since 2.0.0
        attr_reader :read_preference

        # @return [ Integer ] heartbeat_frequency The heartbeat frequency to be set on the client.
        #
        # @since 2.4.0
        attr_reader :heartbeat_frequency

        # @return [ Integer ] max_staleness The max_staleness.
        #
        # @since 2.4.0
        attr_reader :max_staleness

        # @return [ Array<Hash> ] eligible_servers The eligible servers before the latency
        #   window is taken into account.
        #
        # @since 2.0.0
        attr_reader :eligible_servers

        # @return [ Array<Hash> ] suitable_servers The set of servers matching all server
        #  selection logic. May be a subset of eligible_servers and/or candidate_servers.
        #
        # @since 2.0.0
        attr_reader :suitable_servers

        # @return [ Mongo::Cluster::Topology ] type The topology type.
        #
        # @since 2.0.0
        attr_reader :type

        # Instantiate the new spec.
        #
        # @param [ String ] test_path The path to the file.
        #
        # @since 2.0.0
        def initialize(test_path)
          @test = ::Utils.load_spec_yaml_file(test_path)
          @description = "#{@test['topology_description']['type']}: #{File.basename(test_path)}"
          @heartbeat_frequency = @test['heartbeatFrequencyMS'] / 1000 if @test['heartbeatFrequencyMS']
          @read_preference = @test['read_preference']
          @read_preference['mode'] = READ_PREFERENCES[@read_preference['mode']]
          @max_staleness = @read_preference['maxStalenessSeconds']
          @candidate_servers = @test['topology_description']['servers']
          @suitable_servers = @test['suitable_servers'] || []
          @in_latency_window = @test['in_latency_window'] || []
          @type = Mongo::Cluster::Topology.const_get(@test['topology_description']['type'])
        end

        # Does this spec expect a server to be found.
        #
        # @example Will a server be found with this spec.
        #   spec.server_available?
        #
        # @return [true, false] If a server will be found with this spec.
        #
        # @since 2.0.0
        def server_available?
          !in_latency_window.empty?
        end

        # Whether the test requires an error to be raised during server selection.
        #
        # @return [ true, false ] Whether the test expects an error.
        def error?
          @test['error']
        end

        # The subset of suitable servers that falls within the allowable latency
        #   window.
        # We have to correct for our server selection algorithm that adds the primary
        #  to the end of the list for SecondaryPreferred read preference mode.
        #
        # @example Get the list of suitable servers within the latency window.
        #   spec.in_latency_window
        #
        # @return [ Array<Hash> ] The servers within the latency window.
        #
        # @since 2.0.0
        def in_latency_window
          @in_latency_window
        end

        # The servers a topology would return as candidates for selection.
        #
        # @return [ Array<Hash> ] candidate_servers The candidate servers.
        #
        # @since 2.0.0
        def candidate_servers
          @candidate_servers
        end
      end
    end
  end
end

def define_server_selection_spec_tests(test_paths)
  # Linter insists that a server selection semaphore is present when
  # performing server selection.
  require_no_linting

  test_paths.each do |file|

    spec = Mongo::ServerSelection::Read::Spec.new(file)

    context(spec.description) do
      # Cluster needs a topology and topology needs a cluster...
      # This temporary cluster is used for topology construction.
      let(:temp_cluster) do
        double('temp cluster').tap do |cluster|
          allow(cluster).to receive(:servers_list).and_return([])
        end
      end

      let(:topology) do
        options = if spec.type <= Mongo::Cluster::Topology::ReplicaSetNoPrimary
          {replica_set_name: 'foo'}
        else
          {}
        end
        spec.type.new(options, monitoring, temp_cluster)
      end

      let(:monitoring) do
        Mongo::Monitoring.new(monitoring: false)
      end

      let(:listeners) do
        Mongo::Event::Listeners.new
      end

      let(:options) do
        if spec.heartbeat_frequency
          {server_selection_timeout: 0.1, heartbeat_frequency: spec.heartbeat_frequency}
        else
          {server_selection_timeout: 0.1}
        end
      end

      let(:cluster) do
        double('cluster').tap do |c|
          allow(c).to receive(:server_selection_semaphore)
          allow(c).to receive(:connected?).and_return(true)
          allow(c).to receive(:summary)
          allow(c).to receive(:topology).and_return(topology)
          allow(c).to receive(:single?).and_return(topology.single?)
          allow(c).to receive(:sharded?).and_return(topology.sharded?)
          allow(c).to receive(:replica_set?).and_return(topology.replica_set?)
          allow(c).to receive(:unknown?).and_return(topology.unknown?)
          allow(c).to receive(:options).and_return(options)
          allow(c).to receive(:scan!).and_return(true)
          allow(c).to receive(:app_metadata).and_return(app_metadata)
          allow(c).to receive(:heartbeat_interval).and_return(
            spec.heartbeat_frequency || Mongo::Server::Monitor::DEFAULT_HEARTBEAT_INTERVAL)
        end
      end

      # One of the spec test assertions is on the set of servers that are
      # eligible for selection without taking latency into account.
      # In the driver, latency is taken into account at various points during
      # server selection, hence there isn't a method that can be called to
      # retrieve the list of servers without accounting for latency.
      # Work around this by executing server selection with all servers set
      # to zero latency, when evaluating the candidate server set.
      let(:ignore_latency) { false }

      let(:candidate_servers) do
        spec.candidate_servers.collect do |server|
          features = double('features').tap do |feat|
            allow(feat).to receive(:max_staleness_enabled?).and_return(server['maxWireVersion'] && server['maxWireVersion'] >= 5)
            allow(feat).to receive(:check_driver_support!).and_return(true)
          end
          address = Mongo::Address.new(server['address'])
          Mongo::Server.new(address, cluster, monitoring, listeners,
            {monitoring_io: false}.update(options)
          ).tap do |s|
            allow(s).to receive(:average_round_trip_time) do
              if ignore_latency
                0
              elsif server['avg_rtt_ms']
                server['avg_rtt_ms'] / 1000.0
              end
            end
            allow(s).to receive(:tags).and_return(server['tags'])
            allow(s).to receive(:secondary?).and_return(server['type'] == 'RSSecondary')
            allow(s).to receive(:primary?).and_return(server['type'] == 'RSPrimary')
            allow(s).to receive(:mongos?).and_return(server['type'] == 'Mongos')
            allow(s).to receive(:standalone?).and_return(server['type'] == 'Standalone')
            allow(s).to receive(:unknown?).and_return(server['type'] == 'Unknown')
            allow(s).to receive(:connectable?).and_return(true)
            allow(s).to receive(:last_write_date).and_return(
              Time.at(server['lastWrite']['lastWriteDate']['$numberLong'].to_f / 1000)) if server['lastWrite']
            allow(s).to receive(:last_scan).and_return(
              Time.at(server['lastUpdateTime'].to_f / 1000))
            allow(s).to receive(:features).and_return(features)
            allow(s).to receive(:replica_set_name).and_return('foo')
          end
        end
      end

      let(:suitable_servers) do
        spec.suitable_servers.collect do |server|
          Mongo::Server.new(Mongo::Address.new(server['address']), cluster, monitoring, listeners,
            options.merge(monitoring_io: false))
        end
      end

      let(:in_latency_window) do
        spec.in_latency_window.collect do |server|
          Mongo::Server.new(Mongo::Address.new(server['address']), cluster, monitoring, listeners,
            options.merge(monitoring_io: false))
        end
      end

      let(:server_selector_definition) do
        { mode: spec.read_preference['mode'] }.tap do |definition|
          definition[:tag_sets] = spec.read_preference['tag_sets']
          definition[:max_staleness] = spec.max_staleness if spec.max_staleness
        end
      end

      let(:server_selector) do
        Mongo::ServerSelector.get(server_selector_definition)
      end

      let(:app_metadata) do
        Mongo::Server::AppMetadata.new({})
      end

      before do
        allow(cluster).to receive(:servers_list).and_return(candidate_servers)
        allow(cluster).to receive(:servers) do
          # Copy Cluster#servers definition because clusters is a double
          cluster.topology.servers(cluster.servers_list)
        end
        allow(cluster).to receive(:addresses).and_return(candidate_servers.map(&:address))
      end

      if spec.error?

        it 'Raises an InvalidServerPreference exception' do

          expect do
            server_selector.select_server(cluster)
          end.to raise_exception(Mongo::Error::InvalidServerPreference)
        end

      else

        if spec.server_available?

          it 'has non-empty suitable servers' do
            spec.suitable_servers.should be_a(Array)
            spec.suitable_servers.should_not be_empty
          end

          if spec.in_latency_window.length == 1

            it 'selects the expected server' do
              [server_selector.select_server(cluster)].should == in_latency_window
            end

          else

            it 'selects a server in the suitable list' do
              in_latency_window.should include(server_selector.select_server(cluster))
            end

            let(:expected_addresses) do
              in_latency_window.map(&:address).map(&:seed).sort
            end

            let(:actual_addresses) do
              server_selector.suitable_servers(cluster).map(&:address).map(&:seed).sort
            end

            it 'identifies expected suitable servers' do
              actual_addresses.should == expected_addresses
            end

          end

          context 'candidate servers without taking latency into account' do
            let(:ignore_latency) { true }

            let(:expected_addresses) do
              suitable_servers.map(&:address).map(&:seed).sort
            end

            let(:actual_addresses) do
              servers = server_selector.send(:suitable_servers, cluster)

              # The tests expect that only secondaries are "suitable" for
              # server selection with secondary preferred read preference.
              # In actuality, primaries are also suitable, and the driver
              # returns the primaries also. Remove primaries from the
              # actual set when read preference is secondary preferred.
              # HOWEVER, if a test ends up selecting a primary, then it
              # includes that primary into its suitable servers. Therefore
              # only remove primaries when the number of suitable servers
              # is greater than 1.
              servers.delete_if do |server|
                server_selector.is_a?(Mongo::ServerSelector::SecondaryPreferred) &&
                  server.primary? &&
                  servers.length > 1
              end

              # Since we remove the latency requirement, the servers
              # may be returned in arbitrary order.
              servers.map(&:address).map(&:seed).sort
            end

            it 'identifies expected suitable servers' do
              actual_addresses.should == expected_addresses
            end
          end

        else

          # Runner does not handle non-empty suitable servers with
          # no servers in latency window.
          it 'has empty suitable servers' do
            expect(spec.suitable_servers).to eq([])
          end

          it 'Raises a NoServerAvailable Exception' do
            expect do
              server_selector.select_server(cluster)
            end.to raise_exception(Mongo::Error::NoServerAvailable)
          end

        end
      end
    end
  end
end