File: read_worker.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (338 lines) | stat: -rw-r--r-- 13,841 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2015-2023 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/retryable/base_worker'

module Mongo
  module Retryable

    # Implements the logic around retrying read operations.
    #
    # @api private
    #
    # @since 2.19.0
    class ReadWorker < BaseWorker
      # Execute a read operation returning a cursor with retrying.
      #
      # This method performs server selection for the specified server selector
      # and yields to the provided block, which should execute the initial
      # query operation and return its result. The block will be passed the
      # server selected for the operation. If the block raises an exception,
      # and this exception corresponds to a read retryable error, and read
      # retries are enabled for the client, this method will perform server
      # selection again and yield to the block again (with potentially a
      # different server). If the block returns successfully, the result
      # of the block (which should be a Mongo::Operation::Result) is used to
      # construct a Mongo::Cursor object for the result set. The cursor
      # is then returned.
      #
      # If modern retry reads are on (which is the default), the initial read
      # operation will be retried once. If legacy retry reads are on, the
      # initial read operation will be retried zero or more times depending
      # on the :max_read_retries client setting, the default for which is 1.
      # To disable read retries, turn off modern read retries by setting
      # retry_reads: false and set :max_read_retries to 0 on the client.
      #
      # @api private
      #
      # @example Execute a read returning a cursor.
      #   cursor = read_with_retry_cursor(session, server_selector, view) do |server|
      #     # return a Mongo::Operation::Result
      #     ...
      #   end
      #
      # @param [ Mongo::Session ] session The session that the operation is being
      #   run on.
      # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
      #   selector for the operation.
      # @param [ CollectionView ] view The +CollectionView+ defining the query.
      # @param [ Operation::Context | nil ] context the operation context to use
      #   with the cursor.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Cursor ] The cursor for the result set.
      def read_with_retry_cursor(session, server_selector, view, context: nil, &block)
        read_with_retry(session, server_selector, context) do |server|
          result = yield server

          # RUBY-2367: This will be updated to allow the query cache to
          # cache cursors with multi-batch results.
          if QueryCache.enabled? && !view.collection.system_collection?
            CachingCursor.new(view, result, server, session: session, context: context)
          else
            Cursor.new(view, result, server, session: session, context: context)
          end
        end
      end

      # Execute a read operation with retrying.
      #
      # This method performs server selection for the specified server selector
      # and yields to the provided block, which should execute the initial
      # query operation and return its result. The block will be passed the
      # server selected for the operation. If the block raises an exception,
      # and this exception corresponds to a read retryable error, and read
      # retries are enabled for the client, this method will perform server
      # selection again and yield to the block again (with potentially a
      # different server). If the block returns successfully, the result
      # of the block is returned.
      #
      # If modern retry reads are on (which is the default), the initial read
      # operation will be retried once. If legacy retry reads are on, the
      # initial read operation will be retried zero or more times depending
      # on the :max_read_retries client setting, the default for which is 1.
      # To disable read retries, turn off modern read retries by setting
      # retry_reads: false and set :max_read_retries to 0 on the client.
      #
      # @api private
      #
      # @example Execute the read.
      #   read_with_retry(session, server_selector) do |server|
      #     ...
      #   end
      #
      # @param [ Mongo::Session | nil ] session The session that the operation
      #   is being run on.
      # @param [ Mongo::ServerSelector::Selectable | nil ] server_selector
      #   Server selector for the operation.
      # @param [ Mongo::Operation::Context | nil ] context Context for the
      #   read operation.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def read_with_retry(session = nil, server_selector = nil, context = nil, &block)
        if session.nil? && server_selector.nil?
          deprecated_legacy_read_with_retry(&block)
        elsif session&.retry_reads?
          modern_read_with_retry(session, server_selector, context, &block)
        elsif client.max_read_retries > 0
          legacy_read_with_retry(session, server_selector, context, &block)
        else
          read_without_retry(session, server_selector, &block)
        end
      end

      # Execute a read operation with a single retry on network errors.
      #
      # This method is used by the driver for some of the internal housekeeping
      # operations. Application-requested reads should use read_with_retry
      # rather than this method.
      #
      # @api private
      #
      # @example Execute the read.
      #   read_with_one_retry do
      #     ...
      #   end
      #
      # @note This only retries read operations on socket errors.
      #
      # @param [ Hash | nil ] options Options.
      #
      # @option options [ String ] :retry_message Message to log when retrying.
      #
      # @yield Calls the provided block with no arguments
      #
      # @return [ Result ] The result of the operation.
      #
      # @since 2.2.6
      def read_with_one_retry(options = nil)
        yield
      rescue *retryable_exceptions, Error::PoolError => e
        raise e unless e.write_retryable?

        retry_message = options && options[:retry_message]
        log_retry(e, message: retry_message)
        yield
      end

      private

      # Attempts to do a legacy read_with_retry, without either a session or
      # server_selector. This is a deprecated use-case, and a warning will be
      # issued the first time this is invoked.
      #
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def deprecated_legacy_read_with_retry(&block)
        deprecation_warning :read_with_retry,
          'Legacy read_with_retry invocation - ' \
          'please update the application and/or its dependencies'

        # Since we don't have a session, we cannot use the modern read retries.
        # And we need to select a server but we don't have a server selector.
        # Use PrimaryPreferred which will work as long as there is a data
        # bearing node in the cluster; the block may select a different server
        # which is fine.
        server_selector = ServerSelector.get(mode: :primary_preferred)
        legacy_read_with_retry(nil, server_selector, &block)
      end

      # Attempts to do a "modern" read with retry. Only a single retry will
      # be attempted.
      #
      # @param [ Mongo::Session ] session The session that the operation is
      #   being run on.
      # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
      #   selector for the operation.
      # @param [ Mongo::Operation::Context ] context Context for the
      #   read operation.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def modern_read_with_retry(session, server_selector, context, &block)
        server = select_server(
          cluster,
          server_selector,
          session,
          timeout: context&.remaining_timeout_sec
        )
        yield server
      rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
        e.add_notes('modern retry', 'attempt 1')
        raise e if session.in_transaction?
        raise e if !is_retryable_exception?(e) && !e.write_retryable?
        retry_read(e, session, server_selector, context: context, failed_server: server, &block)
      end

      # Attempts to do a "legacy" read with retry. The operation will be
      # attempted multiple times, up to the client's `max_read_retries`
      # setting.
      #
      # @param [ Mongo::Session ] session The session that the operation is
      #   being run on.
      # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
      #   selector for the operation.
      # @param [ Mongo::Operation::Context | nil ] context Context for the
      #   read operation.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def legacy_read_with_retry(session, server_selector, context = nil, &block)
        context&.check_timeout!
        attempt = attempt ? attempt + 1 : 1
        yield select_server(cluster, server_selector, session)
      rescue *legacy_retryable_exceptions, Error::OperationFailure::Family => e
        e.add_notes('legacy retry', "attempt #{attempt}")

        if is_legacy_retryable_exception?(e)
          raise e if attempt > client.max_read_retries || session&.in_transaction?
        elsif e.retryable? && !session&.in_transaction?
          raise e if attempt > client.max_read_retries
        else
          raise e
        end

        log_retry(e, message: 'Legacy read retry')
        sleep(client.read_retry_interval) unless is_retryable_exception?(e)
        retry
      end

      # Attempts to do a read *without* a retry; for example, when retries have
      # been explicitly disabled.
      #
      # @param [ Mongo::Session ] session The session that the operation is
      #   being run on.
      # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
      #   selector for the operation.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def read_without_retry(session, server_selector, &block)
        server = select_server(cluster, server_selector, session)

        begin
          yield server
        rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e
          e.add_note('retries disabled')
          raise e
        end
      end

      # The retry logic of the "modern" read_with_retry implementation.
      #
      # @param [ Exception ] original_error The original error that triggered
      #   the retry.
      # @param [ Mongo::Session ] session The session that the operation is
      #   being run on.
      # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
      #   selector for the operation.
      # @param [ Mongo::Operation::Context | nil ] :context Context for the
      #   read operation.
      # @param [ Mongo::Server | nil ] :failed_server The server on which the original
      #   operation failed.
      # @param [ Proc ] block The block to execute.
      #
      # @return [ Result ] The result of the operation.
      def retry_read(original_error, session, server_selector, context: nil, failed_server: nil, &block)
        server = select_server_for_retry(
          original_error, session, server_selector, context, failed_server
        )

        log_retry(original_error, message: 'Read retry')

        begin
          context&.check_timeout!
          attempt = attempt ? attempt + 1 : 2
          yield server, true
        rescue Error::TimeoutError
          raise
        rescue *retryable_exceptions => e
          e.add_notes('modern retry', "attempt #{attempt}")
          if context&.csot?
            failed_server = server
            retry
          else
            raise e
          end
        rescue Error::OperationFailure::Family, Error::PoolError => e
          e.add_note('modern retry')
          if e.write_retryable?
            e.add_note("attempt #{attempt}")
            if context&.csot?
              failed_server = server
              retry
            else
              raise e
            end
          else
            original_error.add_note("later retry failed: #{e.class}: #{e}")
            raise original_error
          end
        rescue Error, Error::AuthError => e
          e.add_note('modern retry')
          original_error.add_note("later retry failed: #{e.class}: #{e}")
          raise original_error
        end
      end

      def select_server_for_retry(original_error, session, server_selector, context, failed_server)
        select_server(
          cluster,
          server_selector,
          session,
          failed_server,
          timeout: context&.remaining_timeout_sec
        )
      rescue Error, Error::AuthError => e
        original_error.add_note("later retry failed: #{e.class}: #{e}")
        raise original_error
      end
    end
  end
end