1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
|
# frozen_string_literal: true
# rubocop:todo all
require 'singleton'
# There is an undocumented {replSetStepUp: 1} command which can be used to
# ask a particular secondary to become a primary. It has existed since server
# 3.6 or earlier.
#
# Alternatively, to encourage a specific server to be selected, the recommended
# way is to set priority of that server higher. Changing priority requires
# reconfiguring the replica set, which in turn requires the replica set to
# have a primary.
#
# There are three timeouts that affect elections and stepdowns, when asking a
# server to step down:
#
# - secondaryCatchUpPeriodSecs - how long the existing primary will wait for
# secondaries to catch up prior to actually stepping down.
# - replSetStepDown parameter - how long the existing primary will decline
# getting elected as the new primary.
# - electionTimeoutMillis - how long, after a server notices that there is
# no primary, that server will vote or call elections.
#
# These parameters must be configured in a certain way;
#
# - replSetStepDown should generally be higher than secondaryCatchUpPeriodSecs.
# If a server is asked to step down and it spends all of its replSetStepDown
# time waiting for secondaries to catch up, the stepdown itself will not
# be performed and an error will be returned for the stepdown command.
# - secondaryCatchUpPeriodSecs + electionTimeoutMillis should be lower than
# replSetStepDown, so that all of the other servers can participate in
# the election prior to the primary which is stepping down becoming eligible
# to vote and potentially getting reelected.
#
# Settings used by this test:
#
# - replSetStepDown = 4 seconds
# - secondaryCatchUpPeriodSecs = 2 seconds
# - electionTimeoutMillis = 1 second
#
# Recommended guidance for working elections:
# - Set priority of all nodes other than old primary and new desired primary
# to 0
# - Turn off election handoff
# - Use stepdown & stepup commands (even when we don't care which server becomes
# the new primary
# - Put step up command in retry loop
class ClusterTools
include Singleton
def force_step_down
admin_client.database.command(
replSetStepDown: 1, force: true)
end
# https://mongodb.com/docs/manual/reference/parameters/#param.enableElectionHandoff
def set_election_handoff(value)
unless [true, false].include?(value)
raise ArgumentError, 'Value must be true or false'
end
direct_client_for_each_data_bearing_server do |client|
client.use(:admin).database.command(setParameter: 1, enableElectionHandoff: value)
end
end
# Sets election timeout to the specified value, in seconds.
# Election timeout specifies how long nodes in a cluster wait to vote/ask
# for elections once they lose connection with the active primary.
#
# This in theory generally safe to do in the test suite and leave the cluster
# at the 1 second setting, because the tests are run against a local
# deployment which shouldn't have any elections in it at all, unless we are
# testing step down behavior in which case we want the election timeout
# to be low. In practice a low election timeout results in intermittent
# test failures, therefore the timeout should be restored to its default
# value once step down tests are complete.
def set_election_timeout(timeout)
cfg = get_rs_config
cfg['settings']['electionTimeoutMillis'] = timeout * 1000
set_rs_config(cfg)
end
# Resets priorities on all replica set members to 1.
#
# Use at the end of a test run.
def reset_priorities
cfg = get_rs_config
cfg['members'].each do |member|
member['priority'] = 1
end
set_rs_config(cfg)
end
# Requests that the current primary in the RS steps down.
def step_down
admin_client.database.command(
replSetStepDown: 4, secondaryCatchUpPeriodSecs: 2)
rescue Mongo::Error::OperationFailure::Family => e
# While waiting for secondaries to catch up before stepping down, this node decided to step down for other reasons (189)
if e.code == 189
# success
else
raise
end
end
# Attempts to elect the server at the specified address as the new primary
# by asking it to step up.
#
# @param [ Mongo::Address ] address
def step_up(address)
client = direct_client(address)
start = Mongo::Utils.monotonic_time
loop do
begin
client.database.command(replSetStepUp: 1)
break
rescue Mongo::Error::OperationFailure::Family => e
# Election failed. (125)
if e.code == 125
# Possible reason is the node we are trying to elect has deny-listed
# itself. This is where {replSetFreeze: 0} should make it eligible
# for election again but this seems to not always work.
else
raise
end
if Mongo::Utils.monotonic_time > start + 10
raise e
end
end
end
reset_server_states
end
# The recommended guidance for changing a primary is:
#
# - turn off election handoff
# - pick a server to be the new primary
# - set the target's priority to 10, existing primary's priority to 1,
# other servers' priorities to 0
# - call step down on the existing primary
# - call step up on the target in a loop until it becomes the primary
def change_primary
start = Mongo::Utils.monotonic_time
existing_primary = admin_client.cluster.next_primary
existing_primary_address = existing_primary.address
target = admin_client.cluster.servers_list.detect do |server|
!server.arbiter? && server.address != existing_primary_address
end
cfg = get_rs_config
cfg['members'].each do |member|
member['priority'] = case member['host']
when existing_primary_address.to_s
1
when target.address.to_s
10
else
0
end
end
set_rs_config(cfg)
if unfreeze_server(target.address)
# Target server self-elected as primary, no further action is needed.
return
end
step_down
persistently_step_up(target.address)
new_primary = admin_client.cluster.next_primary
puts "#{Time.now} [CT] Primary changed to #{new_primary.address}. Time to change primaries: #{Mongo::Utils.monotonic_time - start}"
end
def persistently_step_up(address)
start = Mongo::Utils.monotonic_time
loop do
puts "#{Time.now} [CT] Asking #{address} to step up"
step_up(address)
if admin_client.cluster.next_primary.address == address
break
end
if Mongo::Utils.monotonic_time - start > 10
raise "Unable to get #{address} instated as primary after 10 seconds"
end
end
end
# Attempts to elect the server at the specified address as the new primary
# by manipulating priorities.
#
# This method requires that there is an active primary in the replica set at
# the time of the call (presumably a different one).
#
# @param [ Mongo::Address ] address
def force_primary(address)
current_primary = admin_client.cluster.next_primary
if current_primary.address == address
raise "Attempting to set primary to #{address} but it is already the primary"
end
encourage_primary(address)
if unfreeze_server(address)
# Target server self-elected as primary, no further action is needed.
return
end
step_down
persistently_step_up(address)
admin_client.cluster.next_primary.unknown!
new_primary = admin_client.cluster.next_primary
if new_primary.address != address
raise "Elected primary #{new_primary.address} is not what we wanted (#{address})"
end
end
# Adjusts replica set configuration so that the next election is likely
# to result in the server at the specified address becoming a primary.
# Address should be a Mongo::Address object.
#
# This method requires that there is an active primary in the replica set at
# the time of the call.
#
# @param [ Mongo::Address ] address
def encourage_primary(address)
existing_primary = admin_client.cluster.next_primary
cfg = get_rs_config
found = false
cfg['members'].each do |member|
if member['host'] == address.to_s
member['priority'] = 10
found = true
elsif member['host'] == existing_primary.address.to_s
member['priority'] = 1
else
member['priority'] = 0
end
end
unless found
raise "No RS member for #{address}"
end
set_rs_config(cfg)
end
# Allows the server at the specified address to run for elections and
# potentially become a primary. Use after issuing a step down command
# to clear the prohibtion on the stepped down server to be a primary.
#
# Returns true if the server at address became a primary, such that
# a step up command is not necessary.
def unfreeze_server(address)
begin
direct_client(address).use('admin').database.command(replSetFreeze: 0)
rescue Mongo::Error::OperationFailure::Family => e
# Mongo::Error::OperationFailure: cannot freeze node when primary or running for election. state: Primary (95)
if e.code == 95
# The server we want to become primary may have already become the
# primary by holding a spontaneous election and winning due to the
# priorities we have set.
admin_client.cluster.servers_list.each do |server|
server.unknown!
end
if admin_client.cluster.next_primary.address == address
puts "#{Time.now} [CT] Primary self-elected to #{address}"
return true
end
end
raise
end
false
end
def unfreeze_all
admin_client.cluster.servers_list.each do |server|
next if server.arbiter?
client = direct_client(server.address)
# Primary refuses to be unfrozen with this message:
# cannot freeze node when primary or running for election. state: Primary (95)
if server != admin_client.cluster.next_primary
client.use('admin').database.command(replSetFreeze: 0)
end
end
end
# Gets the current replica set configuration.
def get_rs_config
result = admin_client.database.command(replSetGetConfig: 1)
doc = result.reply.documents.first
if doc['ok'] != 1
raise 'Failed to get RS config'
end
doc['config']
end
# Reconfigures the replica set with the specified configuration.
# Automatically increases RS version in the process.
def set_rs_config(config)
config = config.dup
config['version'] += 1
cmd = {replSetReconfig: config}
if ClusterConfig.instance.fcv_ish >= '4.4'
# Workaround for https://jira.mongodb.org/browse/SERVER-46894
cmd[:force] = true
end
result = admin_client.database.command(cmd)
doc = result.reply.documents.first
if doc['ok'] != 1
raise 'Failed to reconfigure RS'
end
end
def admin_client
# Since we are triggering elections, we need to have a higher server
# selection timeout applied. The default timeout for tests assumes a
# stable deployment.
(
@admin_client ||= ClientRegistry.instance.global_client('root_authorized').
with(server_selection_timeout: 15).use(:admin)
).tap do |client|
ClientRegistry.reconnect_client_if_perished(client)
end
end
def direct_client(address, options = {})
connect = if SpecConfig.instance.connect_options[:connect] == :load_balanced
:load_balanced
else
:direct
end
@direct_clients ||= {}
cache_key = {address: address}.update(options)
(
@direct_clients[cache_key] ||= ClientRegistry.instance.new_local_client(
[address.to_s],
SpecConfig.instance.test_options.merge(
SpecConfig.instance.auth_options).merge(
connect: connect, server_selection_timeout: 10).merge(options))
).tap do |client|
ClientRegistry.reconnect_client_if_perished(client)
end
end
def close_clients
if @admin_client
@admin_client.close
@admin_client = nil
end
if @direct_clients
@direct_clients.each do |cache_key, client|
client.close
end
@direct_clients = nil
end
end
def each_server(&block)
admin_client.cluster.servers_list.each(&block)
end
def direct_client_for_each_data_bearing_server(&block)
each_server do |server|
next if server.arbiter?
yield direct_client(server.address)
end
end
private
def reset_server_states
each_server do |server|
server.unknown!
end
end
end
|