1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
|
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2015-2023 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'mongo/retryable/base_worker'
module Mongo
module Retryable
# Implements the logic around retrying write operations.
#
# @api private
#
# @since 2.19.0
class WriteWorker < BaseWorker
# Implements write retrying functionality by yielding to the passed
# block one or more times.
#
# If the session is provided (hence, the deployment supports sessions),
# and modern retry writes are enabled on the client, the modern retry
# logic is invoked. Otherwise the legacy retry logic is invoked.
#
# If ending_transaction parameter is true, indicating that a transaction
# is being committed or aborted, the operation is executed exactly once.
# Note that, since transactions require sessions, this method will raise
# ArgumentError if ending_transaction is true and session is nil.
#
# @api private
#
# @example Execute the write.
# write_with_retry do
# ...
# end
#
# @note This only retries operations on not master failures, since it is
# the only case we can be sure a partial write did not already occur.
#
# @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern.
# @param [ true | false ] ending_transaction True if the write operation is
# abortTransaction or commitTransaction, false otherwise.
# @param [ Context ] context The context for the operation.
# @param [ Proc ] block The block to execute.
#
# @yieldparam [ Connection ] connection The connection through which the
# write should be sent.
# @yieldparam [ Integer ] txn_num Transaction number (NOT the ACID kind).
# @yieldparam [ Operation::Context ] context The operation context.
#
# @return [ Result ] The result of the operation.
#
# @since 2.1.0
def write_with_retry(write_concern, ending_transaction: false, context:, &block)
session = context.session
ensure_valid_state!(ending_transaction, session)
unless ending_transaction || retry_write_allowed?(session, write_concern)
return legacy_write_with_retry(nil, context: context, &block)
end
# If we are here, session is not nil. A session being nil would have
# failed retry_write_allowed? check.
server = select_server(
cluster, ServerSelector.primary,
session,
timeout: context.remaining_timeout_sec
)
unless ending_transaction || server.retry_writes?
return legacy_write_with_retry(server, context: context, &block)
end
modern_write_with_retry(session, server, context, &block)
end
# Retryable writes wrapper for operations not supporting modern retryable
# writes.
#
# If the driver is configured to use modern retryable writes, this method
# yields to the passed block exactly once, thus not retrying any writes.
#
# If the driver is configured to use legacy retryable writes, this method
# delegates to legacy_write_with_retry which performs write retries using
# legacy logic.
#
# @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern.
# @param [ Context ] context The context for the operation.
#
# @yieldparam [ Connection ] connection The connection through which the
# write should be sent.
# @yieldparam [ nil ] txn_num nil as transaction number.
# @yieldparam [ Operation::Context ] context The operation context.
def nro_write_with_retry(write_concern, context:, &block)
session = context.session
server = select_server(cluster, ServerSelector.primary, session)
options = session&.client&.options || {}
if options[:retry_writes]
begin
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield connection, nil, context
end
rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e
e.add_note('retries disabled')
raise e
end
else
legacy_write_with_retry(server, context: context, &block)
end
end
# Queries whether the session and write concern support retrying writes.
#
# @param [ Mongo::Session ] session The session that the operation is
# being run on.
# @param [ nil | Hash | WriteConcern::Base ] write_concern The write
# concern.
#
# @return [ true | false ] Whether write retries are allowed or not.
def retry_write_allowed?(session, write_concern)
return false unless session&.retry_writes?
if write_concern.nil?
true
else
WriteConcern.get(write_concern).acknowledged?
end
end
private
# Makes sure the state of the arguments is consistent and valid.
#
# @param [ true | false ] ending_transaction True if the write operation
# is abortTransaction or commitTransaction, false otherwise.
# @param [ nil | Mongo::Session ] session The session that the operation
# is being run on (if any).
def ensure_valid_state!(ending_transaction, session)
if ending_transaction && !session
raise ArgumentError, 'Cannot end a transaction without a session'
end
end
# Implements legacy write retrying functionality by yielding to the passed
# block one or more times.
#
# This method is used for operations which are not supported by modern
# retryable writes, such as delete_many and update_many.
#
# @param [ Server ] server The server which should be used for the
# operation. If not provided, the current primary will be retrieved from
# the cluster.
# @param [ Context ] context The context for the operation.
#
# @yieldparam [ Connection ] connection The connection through which the
# write should be sent.
# @yieldparam [ nil ] txn_num nil as transaction number.
# @yieldparam [ Operation::Context ] context The operation context.
#
# @api private
def legacy_write_with_retry(server = nil, context:)
session = context.session
context.check_timeout!
# This is the pre-session retry logic, and is not subject to
# current retryable write specifications.
# In particular it does not retry on SocketError and SocketTimeoutError.
attempt = 0
begin
attempt += 1
server ||= select_server(
cluster,
ServerSelector.primary,
session,
timeout: context.remaining_timeout_sec
)
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
# Legacy retries do not use txn_num
yield connection, nil, context.dup
end
rescue Error::OperationFailure::Family => e
e.add_note('legacy retry')
e.add_note("attempt #{attempt}")
server = nil
if attempt > client.max_write_retries
raise e
end
if e.label?('RetryableWriteError')
log_retry(e, message: 'Legacy write retry')
cluster.scan!(false)
retry
else
raise e
end
end
end
# Implements modern write retrying functionality by yielding to the passed
# block no more than twice.
#
# @param [ Mongo::Session ] session The session that the operation is
# being run on.
# @param [ Server ] server The server which should be used for the
# operation.
# @param [ Operation::Context ] context The context for the operation.
#
# @yieldparam [ Connection ] connection The connection through which the
# write should be sent.
# @yieldparam [ Integer ] txn_num Transaction number (NOT the ACID kind).
# @yieldparam [ Operation::Context ] context The operation context.
#
# @return [ Result ] The result of the operation.
#
# @api private
def modern_write_with_retry(session, server, context, &block)
txn_num = nil
connection_succeeded = false
server.with_connection(
connection_global_id: context.connection_global_id,
context: context
) do |connection|
connection_succeeded = true
session.materialize_if_needed
txn_num = session.in_transaction? ? session.txn_num : session.next_txn_num
# The context needs to be duplicated here because we will be using
# it later for the retry as well.
yield connection, txn_num, context.dup
end
rescue *retryable_exceptions, Error::PoolError, Auth::Unauthorized, Error::OperationFailure::Family => e
e.add_notes('modern retry', 'attempt 1')
if e.is_a?(Error::OperationFailure::Family)
ensure_retryable!(e)
else
ensure_labeled_retryable!(e, connection_succeeded, session)
end
# Context#with creates a new context, which is not necessary here
# but the API is less prone to misuse this way.
retry_write(e, txn_num, context: context.with(is_retry: true), failed_server: server, &block)
end
# Called after a failed write, this will retry the write no more than
# once.
#
# @param [ Exception ] original_error The exception that triggered the
# retry.
# @param [ Number ] txn_num The transaction number.
# @param [ Operation::Context ] context The context for the operation.
# @param [ Mongo::Server ] failed_server The server on which the original
# operation failed.
#
# @return [ Result ] The result of the operation.
def retry_write(original_error, txn_num, context:, failed_server: nil, &block)
context&.check_timeout!
session = context.session
# We do not request a scan of the cluster here, because error handling
# for the error which triggered the retry should have updated the
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = select_server(
cluster,
ServerSelector.primary,
session,
failed_server,
timeout: context.remaining_timeout_sec
)
unless server.retry_writes?
# Do not need to add "modern retry" here, it should already be on
# the first exception.
original_error.add_note('did not retry because server selected for retry does not support retryable writes')
# When we want to raise the original error, we must not run the
# rescue blocks below that add diagnostics because the diagnostics
# added would either be rendundant (e.g. modern retry note) or wrong
# (e.g. "attempt 2", we are raising the exception produced in the
# first attempt and haven't attempted the second time). Use the
# special marker class to bypass the ordinarily applicable rescues.
raise Error::RaiseOriginalError
end
attempt = attempt ? attempt + 1 : 2
log_retry(original_error, message: 'Write retry')
server.with_connection(connection_global_id: context.connection_global_id) do |connection|
yield(connection, txn_num, context)
end
rescue *retryable_exceptions, Error::PoolError => e
maybe_fail_on_retryable(e, original_error, context, attempt)
failed_server = server
retry
rescue Error::OperationFailure::Family => e
maybe_fail_on_operation_failure(e, original_error, context, attempt)
failed_server = server
retry
rescue Mongo::Error::TimeoutError
raise
rescue Error, Error::AuthError => e
fail_on_other_error!(e, original_error)
rescue Error::RaiseOriginalError
raise original_error
end
# Retry writes on MMAPv1 should raise an actionable error; append actionable
# information to the error message and preserve the backtrace.
def raise_unsupported_error(e)
new_error = Error::OperationFailure.new("#{e.class}: #{e} "\
"This MongoDB deployment does not support retryable writes. Please add "\
"retryWrites=false to your connection string or use the retry_writes: false Ruby client option")
new_error.set_backtrace(e.backtrace)
raise new_error
end
# Make sure the exception object is labeled 'RetryableWriteError'. If it
# isn't, and should not be, re-raise the exception.
def ensure_labeled_retryable!(e, connection_succeeded, session)
if !e.label?('RetryableWriteError')
# If there was an error before the connection was successfully
# checked out and connected, there was no connection present to use
# for adding labels. Therefore, we should check if it is retryable,
# and if it is, add the label and retry it.
if !connection_succeeded && !session.in_transaction? && e.write_retryable?
e.add_label('RetryableWriteError')
else
raise e
end
end
end
# Make sure the exception object supports retryable writes. If it does,
# make sure it has been appropriately labeled. If either condition fails,
# raise an exception.
def ensure_retryable!(e)
if e.unsupported_retryable_write?
raise_unsupported_error(e)
elsif !e.label?('RetryableWriteError')
raise e
end
end
# Raise either e, or original_error, depending on whether e is
# write_retryable.
def maybe_fail_on_retryable(e, original_error, context, attempt)
if e.write_retryable?
e.add_notes('modern retry', "attempt #{attempt}")
raise e unless context&.deadline
else
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end
end
# Raise either e, or original_error, depending on whether e is
# appropriately labeled.
def maybe_fail_on_operation_failure(e, original_error, context, attempt)
e.add_note('modern retry')
if e.label?('RetryableWriteError') && !e.label?('NoWritesPerformed')
e.add_note("attempt #{attempt}")
raise e unless context&.deadline
else
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end
end
# Raise the original error (after annotating).
def fail_on_other_error!(e, original_error)
# Do not need to add "modern retry" here, it should already be on
# the first exception.
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end
end
end
end
|