File: sessions_supported.rb

package info (click to toggle)
ruby-mongo 2.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 14,764 kB
  • sloc: ruby: 108,806; makefile: 5; sh: 2
file content (273 lines) | stat: -rw-r--r-- 9,487 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# frozen_string_literal: true
# rubocop:todo all

# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
  module Operation

    # Shared behavior of operations that support a session.
    #
    # @since 2.5.2
    # @api private
    module SessionsSupported

      private

      ZERO_TIMESTAMP = BSON::Timestamp.new(0, 0)

      READ_COMMANDS = [
        :aggregate,
        :count,
        :dbStats,
        :distinct,
        :find,
        :geoNear,
        :geoSearch,
        :group,
        :mapReduce,
        :parallelCollectionScan
      ].freeze

      # Adds causal consistency document to the selector, if one can be
      # constructed and the selector is for a startTransaction command.
      #
      # When operations are performed in a transaction, only the first
      # operation (the one which starts the transaction via startTransaction)
      # is allowed to have a read concern, and with it the causal consistency
      # document, specified.
      def apply_causal_consistency!(selector, connection)
        return unless selector[:startTransaction]

        apply_causal_consistency_if_possible(selector, connection)
      end

      # Adds causal consistency document to the selector, if one can be
      # constructed.
      #
      # In order for the causal consistency document to be constructed,
      # causal consistency must be enabled for the session and the session
      # must have the current operation time. Also, topology must be
      # replica set or sharded cluster.
      def apply_causal_consistency_if_possible(selector, connection)
        if !connection.description.standalone?
          cc_doc = session.send(:causal_consistency_doc)
          if cc_doc
            rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc)
            selector[:readConcern] = Options::Mapper.transform_values_to_strings(
              rc_doc)
          end
        end
      end

      def flags
        acknowledged_write? ? [] : [:more_to_come]
      end

      def apply_cluster_time!(selector, connection)
        if !connection.description.standalone?
          cluster_time = [
            connection.cluster_time,
            session&.cluster_time,
          ].compact.max

          if cluster_time
            selector['$clusterTime'] = cluster_time
          end
        end
      end

      def read_command?(sel)
        READ_COMMANDS.any? { |c| sel[c] }
      end

      def add_write_concern!(sel)
        sel[:writeConcern] = write_concern.options if write_concern
      end

      def apply_autocommit!(selector)
        session.add_autocommit!(selector)
      end

      def apply_start_transaction!(selector)
        session.add_start_transaction!(selector)
      end

      def apply_txn_num!(selector)
        session.add_txn_num!(selector)
      end

      def apply_read_pref!(selector)
        session.apply_read_pref!(selector) if read_command?(selector)
      end

      def apply_txn_opts!(selector)
        session.add_txn_opts!(selector, read_command?(selector), context)
      end

      def suppress_read_write_concern!(selector)
        session.suppress_read_write_concern!(selector)
      end

      def validate_read_preference!(selector)
        session.validate_read_preference!(selector) if read_command?(selector)
      end

      def command(connection)
        if Lint.enabled?
          unless connection.is_a?(Server::Connection)
            raise Error::LintError, "Connection is not a Connection instance: #{connection}"
          end
        end

        sel = BSON::Document.new(selector(connection))
        add_write_concern!(sel)
        sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name

        add_read_preference(sel, connection)

        if connection.features.sessions_enabled?
          apply_cluster_time!(sel, connection)
          if session && (acknowledged_write? || session.in_transaction?)
            apply_session_options(sel, connection)
          end
        elsif session && session.explicit?
          apply_session_options(sel, connection)
        end

        sel
      end

      # Adds $readPreference field to the command document.
      #
      # $readPreference is only sent when the server is a mongos,
      # following the rules described in
      # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#passing-read-preference-to-mongos.
      # The topology does not matter for figuring out whether to send
      # $readPreference since the decision is always made based on
      # server type.
      #
      # $readPreference is sent to OP_MSG-grokking replica set members.
      #
      # @param [ Hash ] sel Existing command document which will be mutated.
      # @param [ Server::Connection ] connection The connection that the
      #   operation will be executed on.
      def add_read_preference(sel, connection)
        Lint.assert_type(connection, Server::Connection)

        # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#topology-type-single
        read_doc = if connection.description.standalone?
          # Read preference is never sent to standalones.
          nil
        elsif connection.server.load_balancer?
          read&.to_mongos
        elsif connection.description.mongos?
          # When server is a mongos:
          # - $readPreference is never sent when mode is 'primary'
          # - Otherwise $readPreference is sent
          # When mode is 'secondaryPreferred' $readPreference is currently
          # required to only be sent when a non-mode field (i.e. tag_sets)
          # is present, but this causes wrong behavior (DRIVERS-1642).
          read&.to_mongos
        elsif connection.server.cluster.single?
          # In Single topology:
          # - If no read preference is specified by the application, the driver
          #   adds mode: primaryPreferred.
          # - If a read preference is specified by the application, the driver
          #   replaces the mode with primaryPreferred.
          read_doc = if read
            BSON::Document.new(read.to_doc)
          else
            BSON::Document.new
          end
          if [nil, 'primary'].include?(read_doc['mode'])
            read_doc['mode'] = 'primaryPreferred'
          end
          read_doc
        else
          # In replica sets, read preference is passed to the server if one
          # is specified by the application, except for primary read preferences.
          read_doc = BSON::Document.new(read&.to_doc || {})
          if [nil, 'primary'].include?(read_doc['mode'])
            nil
          else
            read_doc
          end
        end

        if read_doc
          sel['$readPreference'] = read_doc
        end
      end

      def apply_session_options(sel, connection)
        apply_cluster_time!(sel, connection)
        sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
        sel.merge!(lsid: session.session_id)
        apply_start_transaction!(sel)
        apply_causal_consistency!(sel, connection)
        apply_autocommit!(sel)
        apply_txn_opts!(sel)
        suppress_read_write_concern!(sel)
        validate_read_preference!(sel)
        apply_txn_num!(sel)
        if session.recovery_token &&
          (sel[:commitTransaction] || sel[:abortTransaction])
        then
          sel[:recoveryToken] = session.recovery_token
        end

        if session.snapshot?
          unless connection.description.server_version_gte?('5.0')
            raise Error::SnapshotSessionInvalidServerVersion
          end

          sel[:readConcern] = {level: 'snapshot'}
          if session.snapshot_timestamp
            sel[:readConcern][:atClusterTime] = session.snapshot_timestamp
          end
        end
      end

      def build_message(connection, context)
        if self.session != context.session
          if self.session
            raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}"
          else
            # Some operations are not constructed with sessions but are
            # executed in a context where a session is available.
            # This could be OK or a driver issue.
            # TODO investigate.
          end
        end

        super.tap do |message|
          if session = context.session
            # Serialize the message to detect client-side problems,
            # such as invalid BSON keys or too large messages.
            # The message will be serialized again
            # later prior to being sent to the connection.
            buf = BSON::ByteBuffer.new
            message.serialize(buf)
            if buf.length > connection.max_message_size
              raise Error::MaxMessageSize.new(connection.max_message_size)
            end
            session.update_state!
          end
        end
      end
    end
  end
end