File: iterable.rb

package info (click to toggle)
ruby-mongo 2.23.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,020 kB
  • sloc: ruby: 110,810; makefile: 5
file content (240 lines) | stat: -rw-r--r-- 8,115 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/cursor_host'

module Mongo
  class Collection
    class View

      # Defines iteration related behavior for collection views, including
      # cursor instantiation.
      #
      # @since 2.0.0
      module Iterable
        include Mongo::CursorHost

        # Iterate through documents returned by a query with this +View+.
        #
        # @example Iterate through the result of the view.
        #   view.each do |document|
        #     p document
        #   end
        #
        # @return [ Enumerator ] The enumerator.
        #
        # @since 2.0.0
        #
        # @yieldparam [ Hash ] Each matching document.
        def each
          @cursor = prefer_cached_cursor? ? cached_cursor : new_cursor_for_iteration
          return @cursor.to_enum unless block_given?

          limit_for_cached_query = compute_limit_for_cached_query

          # Ruby versions 2.5 and older do not support arr[0..nil] syntax, so
          # this must be a separate conditional.
          cursor_to_iterate = if limit_for_cached_query
            @cursor.to_a[0...limit_for_cached_query]
          else
            @cursor
          end

          cursor_to_iterate.each do |doc|
            yield doc
          end
        end

        # Cleans up resources associated with this query.
        #
        # If there is a server cursor associated with this query, it is
        # closed by sending a KillCursors command to the server.
        #
        # @note This method propagates any errors that occur when closing the
        #   server-side cursor.
        #
        # @return [ nil ] Always nil.
        #
        # @raise [ Error::OperationFailure::Family ] If the server cursor close fails.
        #
        # @since 2.1.0
        def close_query
          if @cursor
            @cursor.close
          end
        end
        alias :kill_cursors :close_query

        private

        def select_cursor(session)
          context = Operation::Context.new(
            client: client,
            session: session,
            operation_timeouts: operation_timeouts,
            view: self
          )
          op = initial_query_op(session)
          tracer.trace_operation(op, context) do
            if respond_to?(:write?, true) && write?
              server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
              result = send_initial_query(server, context, operation: op)

              if use_query_cache?
                CachingCursor.new(view, result, server, session: session, context: context)
              else
                Cursor.new(view, result, server, session: session, context: context)
              end
            else
              read_with_retry_cursor(session, server_selector, view, context: context) do |server|
                send_initial_query(server, context, operation: op)
              end
            end
          end
        end

        def cached_cursor
          QueryCache.get(**cache_options)
        end

        def cache_options
          # NB: this hash is passed as keyword argument and must have symbol
          # keys.
          {
            namespace: collection.namespace,
            selector: selector,
            skip: skip,
            sort: sort,
            limit: limit,
            projection: projection,
            collation: collation,
            read_concern: read_concern,
            read_preference: read_preference,
          }
        end

        def initial_query_op(session)
          spec = {
            coll_name: collection.name,
            filter: filter,
            projection: projection,
            db_name: database.name,
            session: session,
            collation: collation,
            sort: sort,
            skip: skip,
            let: options[:let],
            limit: limit,
            allow_disk_use: options[:allow_disk_use],
            allow_partial_results: options[:allow_partial_results],
            read: read,
            read_concern: options[:read_concern] || read_concern,
            batch_size: batch_size,
            hint: options[:hint],
            max_scan: options[:max_scan],
            max_value: options[:max_value],
            min_value: options[:min_value],
            no_cursor_timeout: options[:no_cursor_timeout],
            return_key: options[:return_key],
            show_disk_loc: options[:show_disk_loc],
            comment: options[:comment],
            oplog_replay: oplog_replay
          }

          if spec[:oplog_replay]
            collection.client.log_warn("The :oplog_replay option is deprecated and ignored by MongoDB 4.4 and later")
          end

          maybe_set_tailable_options(spec)

          if explained?
            spec[:explain] = options[:explain]
            Operation::Explain.new(spec)
          else
            Operation::Find.new(spec)
          end
        end

        def send_initial_query(server, context, operation: nil)
          operation ||= initial_query_op(context.session)
          if server.load_balancer?
            # Connection will be checked in when cursor is drained.
            connection = server.pool.check_out(context: context)
            operation.execute_with_connection(connection, context: context)
          else
            operation.execute(server, context: context)
          end
        end

        def use_query_cache?
          QueryCache.enabled? && !collection.system_collection?
        end

        # If the caching cursor is closed and was not fully iterated,
        # the documents we have in it are not the complete result set and
        # we have no way of completing that iteration.
        # Therefore, discard that cursor and start iteration again.
        def prefer_cached_cursor?
          use_query_cache? &&
            cached_cursor &&
            (cached_cursor.fully_iterated? || !cached_cursor.closed?)
        end

        # Start a new cursor for use when iterating (via #each).
        def new_cursor_for_iteration
          session = client.get_session(@options)
          select_cursor(session).tap do |cursor|
            if use_query_cache?
              # No need to store the cursor in the query cache if there is
              # already a cached cursor stored at this key.
              QueryCache.set(cursor, **cache_options)
            end
          end
        end

        def compute_limit_for_cached_query
          return nil unless use_query_cache? && respond_to?(:limit)

          # If a query with a limit is performed, the query cache will
          # re-use results from an earlier query with the same or larger
          # limit, and then impose the lower limit during iteration.
          return QueryCache.normalized_limit(limit)
        end

        # Add tailable cusror options to the command specifiction if needed.
        #
        # @param [ Hash ] spec The command specification.
        def maybe_set_tailable_options(spec)
          case cursor_type
          when :tailable
            spec[:tailable] = true
          when :tailable_await
            spec[:tailable] = true
            spec[:await_data] = true
          end
        end

        # @return [ true | false | nil ] options[:oplog_replay], if
        #    set, otherwise the same option from the collection.
        def oplog_replay
          v = options[:oplog_replay]
          v.nil? ? collection.options[:oplog_replay] : v
        end
      end
    end
  end
end