1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
|
# frozen_string_literal: true
# rubocop:todo all
module CommonShortcuts
module ClassMethods
# Declares a topology double, which is configured to accept summary
# calls as those are used in SDAM event creation
def declare_topology_double
let(:topology) do
double('topology').tap do |topology|
allow(topology).to receive(:summary)
end
end
end
# For tests which require clients to connect, clean slate asks all
# existing clients to be closed prior to the test execution.
# Note that clean_slate closes all clients for each test in the scope.
def clean_slate
before do
ClientRegistry.instance.close_all_clients
BackgroundThreadRegistry.instance.verify_empty!
end
end
# Similar to clean slate but closes clients once before all tests in
# the scope. Use when the tests do not create new clients but do not
# want any background output from previously existing clients.
def clean_slate_for_all
before(:all) do
ClientRegistry.instance.close_all_clients
BackgroundThreadRegistry.instance.verify_empty!
end
end
# If only the lite spec helper was loaded, this method does nothing.
# If the full spec helper was loaded, this method performs the same function
# as clean_state_for_all.
def clean_slate_for_all_if_possible
before(:all) do
if defined?(ClusterTools)
ClientRegistry.instance.close_all_clients
BackgroundThreadRegistry.instance.verify_empty!
end
end
end
# For some reason, there are tests which fail on evergreen either
# intermittently or reliably that always succeed locally.
# Debugging of tests in evergreen is difficult/impossible,
# thus this workaround.
def clean_slate_on_evergreen
before(:all) do
if SpecConfig.instance.ci?
ClientRegistry.instance.close_all_clients
end
end
end
# Applies environment variable overrides in +env+ to the global environment
# (+ENV+) for the duration of each test.
#
# If a key's value in +env+ is nil, this key is removed from +ENV+.
#
# When the test finishes, the values in original +ENV+ that were overridden
# by +env+ are restored. If a key was not in original +ENV+ and was
# overridden by +env+, this key is removed from +ENV+ after the test.
#
# If the environment variables are not known at test definition time
# but are determined at test execution time, pass a block instead of
# the +env+ parameter and return the desired environment variables as
# a Hash from the block.
def local_env(env = nil, &block)
around do |example|
env ||= block.call
# This duplicates ENV.
# Note that ENV.dup produces an Object which does not behave like
# the original ENV, and hence is not usable.
saved_env = ENV.to_h
env.each do |k, v|
if v.nil?
ENV.delete(k)
else
ENV[k] = v
end
end
begin
example.run
ensure
env.each do |k, v|
if saved_env.key?(k)
ENV[k] = saved_env[k]
else
ENV.delete(k)
end
end
end
end
end
def clear_ocsp_cache
before do
Mongo.clear_ocsp_cache
end
end
def with_ocsp_mock(ca_file_path, responder_cert_path, responder_key_path,
fault: nil, port: 8100
)
clear_ocsp_cache
around do |example|
args = [
SpecConfig.instance.ocsp_files_dir.join('ocsp_mock.py').to_s,
'--ca_file', ca_file_path.to_s,
'--ocsp_responder_cert', responder_cert_path.to_s,
'--ocsp_responder_key', responder_key_path.to_s,
'-p', port.to_s,
]
if SpecConfig.instance.client_debug?
# Use when debugging - tests run faster without -v.
args << '-v'
end
if fault
args += ['--fault', fault]
end
process = ChildProcess.new(*args)
process.io.inherit!
retried = false
begin
process.start
rescue
if retried
raise
else
sleep 1
retried = true
retry
end
end
begin
sleep 0.4
example.run
ensure
if process.exited?
raise "Spawned process exited before we stopped it"
end
process.stop
process.wait
end
end
end
def with_openssl_debug
around do |example|
v = OpenSSL.debug
OpenSSL.debug = true
begin
example.run
ensure
OpenSSL.debug = v
end
end
end
end
module InstanceMethods
def kill_all_server_sessions
begin
ClientRegistry.instance.global_client('root_authorized').command(killAllSessions: [])
# killAllSessions also kills the implicit session which the driver uses
# to send this command, as a result it always fails
rescue Mongo::Error::OperationFailure::Family => e
# "operation was interrupted"
unless e.code == 11601
raise
end
end
end
def wait_for_all_servers(cluster)
# Cluster waits for initial round of sdam until the primary
# is discovered, which means by the time a connection is obtained
# here some of the servers in the topology may still be unknown.
# This messes with event expectations below. Therefore, wait for
# all servers in the topology to be checked.
#
# This wait here assumes all addresses specified for the test
# suite are for working servers of the cluster; if this is not
# the case, this test will fail due to exceeding the general
# test timeout eventually.
while cluster.servers_list.any? { |server| server.unknown? }
warn "Waiting for unknown servers in #{cluster.summary}"
sleep 0.25
end
end
def make_server(mode, options = {})
tags = options[:tags] || {}
average_round_trip_time = if mode == :unknown
nil
else
options[:average_round_trip_time] || 0
end
if mode == :unknown
config = {}
else
config = {
'isWritablePrimary' => mode == :primary,
'secondary' => mode == :secondary,
'arbiterOnly' => mode == :arbiter,
'isreplicaset' => mode == :ghost,
'hidden' => mode == :other,
'msg' => mode == :mongos ? 'isdbgrid' : nil,
'tags' => tags,
'ok' => 1,
'minWireVersion' => 2, 'maxWireVersion' => 8,
}
if [:primary, :secondary, :arbiter, :other].include?(mode)
config['setName'] = 'mongodb_set'
end
end
listeners = Mongo::Event::Listeners.new
monitoring = Mongo::Monitoring.new
address = options[:address]
cluster = double('cluster')
allow(cluster).to receive(:topology).and_return(topology)
allow(cluster).to receive(:app_metadata)
allow(cluster).to receive(:options).and_return({})
allow(cluster).to receive(:run_sdam_flow)
allow(cluster).to receive(:monitor_app_metadata)
allow(cluster).to receive(:push_monitor_app_metadata)
allow(cluster).to receive(:heartbeat_interval).and_return(10)
server = Mongo::Server.new(address, cluster, monitoring, listeners,
monitoring_io: false)
# Since the server references a double for the cluster, the server
# must be closed in the scope of the example.
register_server(server)
description = Mongo::Server::Description.new(
address, config,
average_round_trip_time: average_round_trip_time,
)
server.tap do |s|
allow(s).to receive(:description).and_return(description)
end
end
def make_protocol_reply(payload)
Mongo::Protocol::Reply.new.tap do |reply|
reply.instance_variable_set('@flags', [])
reply.instance_variable_set('@documents', [payload])
end
end
def make_not_master_reply
make_protocol_reply(
'ok' => 0, 'code' => 10107, 'errmsg' => 'not master'
)
end
def make_node_recovering_reply
make_protocol_reply(
'ok' => 0, 'code' => 11602, 'errmsg' => 'InterruptedDueToStepDown'
)
end
def make_node_shutting_down_reply
make_protocol_reply(
'ok' => 0, 'code' => 91, 'errmsg' => 'shutdown in progress'
)
end
def register_cluster(cluster)
finalizer = lambda do |cluster|
cluster.close
end
LocalResourceRegistry.instance.register(cluster, finalizer)
end
def register_server(server)
finalizer = lambda do |server|
if server.connected?
server.close
end
end
LocalResourceRegistry.instance.register(server, finalizer)
end
def register_background_thread_object(bgt_object)
finalizer = lambda do |bgt_object|
bgt_object.stop!
end
LocalResourceRegistry.instance.register(bgt_object, finalizer)
end
def register_pool(pool)
finalizer = lambda do |pool|
if !pool.closed?
pool.close(wait: true)
end
end
LocalResourceRegistry.instance.register(pool, finalizer)
end
# Stop monitoring threads on the specified clients, after ensuring
# each client has a writable server. Used for tests which assert on
# global side effects like log messages being generated, to prevent
# background threads from interfering with assertions.
def stop_monitoring(*clients)
clients.each do |client|
client.cluster.next_primary
client.cluster.close
# We have tests that stop monitoring to reduce the noise happening in
# background. These tests perform operations which requires the pools
# to function. See also RUBY-3102.
client.cluster.servers_list.each do |server|
if pool = server.pool
pool.instance_variable_set('@closed', false)
# Stop the populator so that we don't have leftover threads.
pool.instance_variable_get('@populator').stop!
end
end
end
end
DNS_INTERFACES = [
[:udp, "0.0.0.0", 5300],
[:tcp, "0.0.0.0", 5300],
]
# Starts the DNS server and returns it; should be run from within an
# Async block. Prefer #mock_dns instead, which does the setup for you.
def start_dns_server(config)
RubyDNS::run_server(DNS_INTERFACES) do
config.each do |(query, type, *answers)|
resource_cls = Resolv::DNS::Resource::IN.const_get(type.to_s.upcase)
resources = answers.map do |answer|
resource_cls.new(*answer)
end
match(query, resource_cls) do |req|
req.add(resources)
end
end
end
end
# Starts and runs a DNS server, then yields to the attached block.
def mock_dns(config)
# only require rubydns when we need it; it's MRI-only.
require 'rubydns'
Async do |task|
server = start_dns_server(config)
yield
ensure
server.stop
end
end
# Wait for snapshot reads to become available to prevent this error:
# [246:SnapshotUnavailable]: Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1646666892, 4). Collection minimum is Timestamp(1646666892, 5) (on localhost:27017, modern retry, attempt 1)
def wait_for_snapshot(db: nil, collection: nil, client: nil)
client ||= authorized_client
client = client.use(db) if db
collection ||= 'any'
start_time = Mongo::Utils.monotonic_time
begin
client.start_session(snapshot: true) do |session|
client[collection].aggregate([{'$match': {any: true}}], session: session).to_a
end
rescue Mongo::Error::OperationFailure::Family => e
# Retry them as the server demands...
if e.code == 246 # SnapshotUnavailable
if Mongo::Utils.monotonic_time < start_time + 10
retry
end
end
raise
end
end
# Make the server usable for operations after it was marked closed.
# Used for tests that e.g. mock network operations to avoid interference
# from server monitoring.
def reset_pool(server)
if pool = server.pool_internal
pool.close
end
server.remove_instance_variable('@pool')
server.pool.ready
end
end
end
|