File: behavior.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (131 lines) | stat: -rw-r--r-- 4,328 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# frozen_string_literal: true

module Mongo
  class Collection
    class View
      class Aggregation
        # Distills the behavior common to aggregator classes, like
        # View::Aggregator and View::ChangeStream.
        module Behavior
          extend Forwardable
          include Enumerable
          include Immutable
          include Iterable
          include Explainable
          include Loggable
          include Retryable

          # @return [ View ] view The collection view.
          attr_reader :view

          # Delegate necessary operations to the view.
          def_delegators :view, :collection, :read, :cluster, :cursor_type, :limit, :batch_size

          # Delegate necessary operations to the collection.
          def_delegators :collection, :database, :client

          # Set to true if disk usage is allowed during the aggregation.
          #
          # @example Set disk usage flag.
          #   aggregation.allow_disk_use(true)
          #
          # @param [ true, false ] value The flag value.
          #
          # @return [ true, false, Aggregation ] The aggregation if a value was
          #   set or the value if used as a getter.
          #
          # @since 2.0.0
          def allow_disk_use(value = nil)
            configure(:allow_disk_use, value)
          end

          # Get the explain plan for the aggregation.
          #
          # @example Get the explain plan for the aggregation.
          #   aggregation.explain
          #
          # @return [ Hash ] The explain plan.
          #
          # @since 2.0.0
          def explain
            self.class.new(view, pipeline, options.merge(explain: true)).first
          end

          # Whether this aggregation will write its result to a database collection.
          #
          # @return [ Boolean ] Whether the aggregation will write its result
          #   to a collection.
          #
          # @api private
          def write?
            pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) }
          end

          # @return [ Integer | nil ] the timeout_ms value that was passed as
          #   an option to this object, or which was inherited from the view.
          #
          # @api private
          def timeout_ms
            @timeout_ms || view.timeout_ms
          end

          private

          # Common setup for all classes that include this behavior; the
          # constructor should invoke this method.
          def perform_setup(view, options, forbid: [])
            @view = view

            @timeout_ms = options.delete(:timeout_ms)
            @options = BSON::Document.new(options).freeze

            yield

            validate_timeout_mode!(options, forbid: forbid)
          end

          def server_selector
            @view.send(:server_selector)
          end

          def aggregate_spec(session, read_preference)
            Builder::Aggregation.new(
              pipeline,
              view,
              options.merge(session: session, read_preference: read_preference)
            ).specification
          end

          # Skip, sort, limit, projection are specified as pipeline stages
          # rather than as options.
          def cache_options
            {
              namespace: collection.namespace,
              selector: pipeline,
              read_concern: view.read_concern,
              read_preference: view.read_preference,
              collation: options[:collation],
              # Aggregations can read documents from more than one collection,
              # so they will be cleared on every write operation.
              multi_collection: true,
            }
          end

          # @return [ Hash ] timeout_ms value set on the operation level (if any),
          #   and/or timeout_ms that is set on collection/database/client level (if any).
          #
          # @api private
          def operation_timeouts(opts = {})
            {}.tap do |result|
              if opts[:timeout_ms] || @timeout_ms
                result[:operation_timeout_ms] = opts.delete(:timeout_ms) || @timeout_ms
              else
                result[:inherited_timeout_ms] = view.timeout_ms
              end
            end
          end
        end
      end
    end
  end
end