File: cluster_tools.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (384 lines) | stat: -rw-r--r-- 12,580 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# frozen_string_literal: true
# rubocop:todo all

require 'singleton'

# There is an undocumented {replSetStepUp: 1} command which can be used to
# ask a particular secondary to become a primary. It has existed since server
# 3.6 or earlier.
#
# Alternatively, to encourage a specific server to be selected, the recommended
# way is to set priority of that server higher. Changing priority requires
# reconfiguring the replica set, which in turn requires the replica set to
# have a primary.
#
# There are three timeouts that affect elections and stepdowns, when asking a
# server to step down:
#
# - secondaryCatchUpPeriodSecs - how long the existing primary will wait for
#   secondaries to catch up prior to actually stepping down.
# - replSetStepDown parameter - how long the existing primary will decline
#   getting elected as the new primary.
# - electionTimeoutMillis - how long, after a server notices that there is
#   no primary, that server will vote or call elections.
#
# These parameters must be configured in a certain way;
#
# - replSetStepDown should generally be higher than secondaryCatchUpPeriodSecs.
#   If a server is asked to step down and it spends all of its replSetStepDown
#   time waiting for secondaries to catch up, the stepdown itself will not
#   be performed and an error will be returned for the stepdown command.
# - secondaryCatchUpPeriodSecs + electionTimeoutMillis should be lower than
#   replSetStepDown, so that all of the other servers can participate in
#   the election prior to the primary which is stepping down becoming eligible
#   to vote and potentially getting reelected.
#
# Settings used by this test:
#
# - replSetStepDown = 4 seconds
# - secondaryCatchUpPeriodSecs = 2 seconds
# - electionTimeoutMillis = 1 second
#
# Recommended guidance for working elections:
# - Set priority of all nodes other than old primary and new desired primary
#   to 0
# - Turn off election handoff
# - Use stepdown & stepup commands (even when we don't care which server becomes
#   the new primary
# - Put step up command in retry loop

class ClusterTools
  include Singleton

  def force_step_down
    admin_client.database.command(
      replSetStepDown: 1, force: true)
  end

  # https://mongodb.com/docs/manual/reference/parameters/#param.enableElectionHandoff
  def set_election_handoff(value)
    unless [true, false].include?(value)
      raise ArgumentError, 'Value must be true or false'
    end

    direct_client_for_each_data_bearing_server do |client|
      client.use(:admin).database.command(setParameter: 1, enableElectionHandoff: value)
    end
  end

  # Sets election timeout to the specified value, in seconds.
  # Election timeout specifies how long nodes in a cluster wait to vote/ask
  # for elections once they lose connection with the active primary.
  #
  # This in theory generally safe to do in the test suite and leave the cluster
  # at the 1 second setting, because the tests are run against a local
  # deployment which shouldn't have any elections in it at all, unless we are
  # testing step down behavior in which case we want the election timeout
  # to be low. In practice a low election timeout results in intermittent
  # test failures, therefore the timeout should be restored to its default
  # value once step down tests are complete.
  def set_election_timeout(timeout)
    cfg = get_rs_config
    cfg['settings']['electionTimeoutMillis'] = timeout * 1000
    set_rs_config(cfg)
  end

  # Resets priorities on all replica set members to 1.
  #
  # Use at the end of a test run.
  def reset_priorities
    cfg = get_rs_config
    cfg['members'].each do |member|
      member['priority'] = 1
    end
    set_rs_config(cfg)
  end

  # Requests that the current primary in the RS steps down.
  def step_down
    admin_client.database.command(
      replSetStepDown: 4, secondaryCatchUpPeriodSecs: 2)
  rescue Mongo::Error::OperationFailure::Family => e
    # While waiting for secondaries to catch up before stepping down, this node decided to step down for other reasons (189)
    if e.code == 189
      # success
    else
      raise
    end
  end

  # Attempts to elect the server at the specified address as the new primary
  # by asking it to step up.
  #
  # @param [ Mongo::Address ] address
  def step_up(address)
    client = direct_client(address)
    start = Mongo::Utils.monotonic_time
    loop do
      begin
        client.database.command(replSetStepUp: 1)
        break
      rescue Mongo::Error::OperationFailure::Family => e
        # Election failed. (125)
        if e.code == 125
          # Possible reason is the node we are trying to elect has deny-listed
          # itself. This is where {replSetFreeze: 0} should make it eligible
          # for election again but this seems to not always work.
        else
          raise
        end

        if Mongo::Utils.monotonic_time > start + 10
          raise e
        end
      end
    end
    reset_server_states
  end

  # The recommended guidance for changing a primary is:
  #
  # - turn off election handoff
  # - pick a server to be the new primary
  # - set the target's priority to 10, existing primary's priority to 1,
  #   other servers' priorities to 0
  # - call step down on the existing primary
  # - call step up on the target in a loop until it becomes the primary
  def change_primary
    start = Mongo::Utils.monotonic_time
    existing_primary = admin_client.cluster.next_primary
    existing_primary_address = existing_primary.address

    target = admin_client.cluster.servers_list.detect do |server|
      !server.arbiter? && server.address != existing_primary_address
    end

    cfg = get_rs_config
    cfg['members'].each do |member|
      member['priority'] = case member['host']
      when existing_primary_address.to_s
        1
      when target.address.to_s
        10
      else
        0
      end
    end
    set_rs_config(cfg)

    if unfreeze_server(target.address)
      # Target server self-elected as primary, no further action is needed.
      return
    end

    step_down
    persistently_step_up(target.address)

    new_primary = admin_client.cluster.next_primary
    puts "#{Time.now} [CT] Primary changed to #{new_primary.address}. Time to change primaries: #{Mongo::Utils.monotonic_time - start}"
  end

  def persistently_step_up(address)
    start = Mongo::Utils.monotonic_time
    loop do
      puts "#{Time.now} [CT] Asking #{address} to step up"

      step_up(address)

      if admin_client.cluster.next_primary.address == address
        break
      end

      if Mongo::Utils.monotonic_time - start > 10
        raise "Unable to get #{address} instated as primary after 10 seconds"
      end
    end
  end

  # Attempts to elect the server at the specified address as the new primary
  # by manipulating priorities.
  #
  # This method requires that there is an active primary in the replica set at
  # the time of the call (presumably a different one).
  #
  # @param [ Mongo::Address ] address
  def force_primary(address)
    current_primary = admin_client.cluster.next_primary
    if current_primary.address == address
      raise "Attempting to set primary to #{address} but it is already the primary"
    end
    encourage_primary(address)

    if unfreeze_server(address)
      # Target server self-elected as primary, no further action is needed.
      return
    end

    step_down
    persistently_step_up(address)
    admin_client.cluster.next_primary.unknown!
    new_primary = admin_client.cluster.next_primary
    if new_primary.address != address
      raise "Elected primary #{new_primary.address} is not what we wanted (#{address})"
    end
  end

  # Adjusts replica set configuration so that the next election is likely
  # to result in the server at the specified address becoming a primary.
  # Address should be a Mongo::Address object.
  #
  # This method requires that there is an active primary in the replica set at
  # the time of the call.
  #
  # @param [ Mongo::Address ] address
  def encourage_primary(address)
    existing_primary = admin_client.cluster.next_primary
    cfg = get_rs_config
    found = false
    cfg['members'].each do |member|
      if member['host'] == address.to_s
        member['priority'] = 10
        found = true
      elsif member['host'] == existing_primary.address.to_s
        member['priority'] = 1
      else
        member['priority'] = 0
      end
    end
    unless found
      raise "No RS member for #{address}"
    end

    set_rs_config(cfg)
  end

  # Allows the server at the specified address to run for elections and
  # potentially become a primary. Use after issuing a step down command
  # to clear the prohibtion on the stepped down server to be a primary.
  #
  # Returns true if the server at address became a primary, such that
  # a step up command is not necessary.
  def unfreeze_server(address)
    begin
      direct_client(address).use('admin').database.command(replSetFreeze: 0)
    rescue Mongo::Error::OperationFailure::Family => e
      # Mongo::Error::OperationFailure: cannot freeze node when primary or running for election. state: Primary (95)
      if e.code == 95
        # The server we want to become primary may have already become the
        # primary by holding a spontaneous election and winning due to the
        # priorities we have set.
        admin_client.cluster.servers_list.each do |server|
          server.unknown!
        end
        if admin_client.cluster.next_primary.address == address
          puts "#{Time.now} [CT] Primary self-elected to #{address}"
          return true
        end
      end
      raise
    end
    false
  end

  def unfreeze_all
    admin_client.cluster.servers_list.each do |server|
      next if server.arbiter?
      client = direct_client(server.address)
      # Primary refuses to be unfrozen with this message:
      # cannot freeze node when primary or running for election. state: Primary (95)
      if server != admin_client.cluster.next_primary
        client.use('admin').database.command(replSetFreeze: 0)
      end
    end
  end

  # Gets the current replica set configuration.
  def get_rs_config
    result = admin_client.database.command(replSetGetConfig: 1)
    doc = result.reply.documents.first
    if doc['ok'] != 1
      raise 'Failed to get RS config'
    end
    doc['config']
  end

  # Reconfigures the replica set with the specified configuration.
  # Automatically increases RS version in the process.
  def set_rs_config(config)
    config = config.dup
    config['version'] += 1
    cmd = {replSetReconfig: config}
    if ClusterConfig.instance.fcv_ish >= '4.4'
      # Workaround for https://jira.mongodb.org/browse/SERVER-46894
      cmd[:force] = true
    end
    result = admin_client.database.command(cmd)
    doc = result.reply.documents.first
    if doc['ok'] != 1
      raise 'Failed to reconfigure RS'
    end
  end

  def admin_client
    # Since we are triggering elections, we need to have a higher server
    # selection timeout applied. The default timeout for tests assumes a
    # stable deployment.
    (
      @admin_client ||= ClientRegistry.instance.global_client('root_authorized').
        with(server_selection_timeout: 15).use(:admin)
    ).tap do |client|
      ClientRegistry.reconnect_client_if_perished(client)
    end
  end

  def direct_client(address, options = {})
    connect = if SpecConfig.instance.connect_options[:connect] == :load_balanced
      :load_balanced
    else
      :direct
    end
    @direct_clients ||= {}
    cache_key = {address: address}.update(options)
    (
      @direct_clients[cache_key] ||= ClientRegistry.instance.new_local_client(
        [address.to_s],
        SpecConfig.instance.test_options.merge(
          SpecConfig.instance.auth_options).merge(
          connect: connect, server_selection_timeout: 10).merge(options))
    ).tap do |client|
      ClientRegistry.reconnect_client_if_perished(client)
    end
  end

  def close_clients
    if @admin_client
      @admin_client.close
      @admin_client = nil
    end
    if @direct_clients
      @direct_clients.each do |cache_key, client|
        client.close
      end
      @direct_clients = nil
    end
  end

  def each_server(&block)
    admin_client.cluster.servers_list.each(&block)
  end

  def direct_client_for_each_data_bearing_server(&block)
    each_server do |server|
      next if server.arbiter?
      yield direct_client(server.address)
    end
  end

  private

  def reset_server_states
    each_server do |server|
      server.unknown!
    end
  end
end