1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
|
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
module Operation
# Shared behavior of operations that support a session.
#
# @since 2.5.2
# @api private
module SessionsSupported
private
ZERO_TIMESTAMP = BSON::Timestamp.new(0, 0)
READ_COMMANDS = [
:aggregate,
:count,
:dbStats,
:distinct,
:find,
:geoNear,
:geoSearch,
:group,
:mapReduce,
:parallelCollectionScan
].freeze
# Adds causal consistency document to the selector, if one can be
# constructed and the selector is for a startTransaction command.
#
# When operations are performed in a transaction, only the first
# operation (the one which starts the transaction via startTransaction)
# is allowed to have a read concern, and with it the causal consistency
# document, specified.
def apply_causal_consistency!(selector, connection)
return unless selector[:startTransaction]
apply_causal_consistency_if_possible(selector, connection)
end
# Adds causal consistency document to the selector, if one can be
# constructed.
#
# In order for the causal consistency document to be constructed,
# causal consistency must be enabled for the session and the session
# must have the current operation time. Also, topology must be
# replica set or sharded cluster.
def apply_causal_consistency_if_possible(selector, connection)
if !connection.description.standalone?
cc_doc = session.send(:causal_consistency_doc)
if cc_doc
rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc)
selector[:readConcern] = Options::Mapper.transform_values_to_strings(
rc_doc)
end
end
end
def flags
acknowledged_write? ? [] : [:more_to_come]
end
def apply_cluster_time!(selector, connection)
if !connection.description.standalone?
cluster_time = [
connection.cluster_time,
session&.cluster_time,
].compact.max
if cluster_time
selector['$clusterTime'] = cluster_time
end
end
end
def read_command?(sel)
READ_COMMANDS.any? { |c| sel[c] }
end
def add_write_concern!(sel)
sel[:writeConcern] = write_concern.options if write_concern
end
def apply_autocommit!(selector)
session.add_autocommit!(selector)
end
def apply_start_transaction!(selector)
session.add_start_transaction!(selector)
end
def apply_txn_num!(selector)
session.add_txn_num!(selector)
end
def apply_read_pref!(selector)
session.apply_read_pref!(selector) if read_command?(selector)
end
def apply_txn_opts!(selector)
session.add_txn_opts!(selector, read_command?(selector), context)
end
def suppress_read_write_concern!(selector)
session.suppress_read_write_concern!(selector)
end
def validate_read_preference!(selector)
session.validate_read_preference!(selector) if read_command?(selector)
end
def command(connection)
if Lint.enabled?
unless connection.is_a?(Server::Connection)
raise Error::LintError, "Connection is not a Connection instance: #{connection}"
end
end
sel = BSON::Document.new(selector(connection))
add_write_concern!(sel)
sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name
add_read_preference(sel, connection)
if connection.features.sessions_enabled?
apply_cluster_time!(sel, connection)
if session && (acknowledged_write? || session.in_transaction?)
apply_session_options(sel, connection)
end
elsif session && session.explicit?
apply_session_options(sel, connection)
end
sel
end
# Adds $readPreference field to the command document.
#
# $readPreference is only sent when the server is a mongos,
# following the rules described in
# https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#passing-read-preference-to-mongos.
# The topology does not matter for figuring out whether to send
# $readPreference since the decision is always made based on
# server type.
#
# $readPreference is sent to OP_MSG-grokking replica set members.
#
# @param [ Hash ] sel Existing command document which will be mutated.
# @param [ Server::Connection ] connection The connection that the
# operation will be executed on.
def add_read_preference(sel, connection)
Lint.assert_type(connection, Server::Connection)
# https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md#topology-type-single
read_doc = if connection.description.standalone?
# Read preference is never sent to standalones.
nil
elsif connection.server.load_balancer?
read&.to_mongos
elsif connection.description.mongos?
# When server is a mongos:
# - $readPreference is never sent when mode is 'primary'
# - Otherwise $readPreference is sent
# When mode is 'secondaryPreferred' $readPreference is currently
# required to only be sent when a non-mode field (i.e. tag_sets)
# is present, but this causes wrong behavior (DRIVERS-1642).
read&.to_mongos
elsif connection.server.cluster.single?
# In Single topology:
# - If no read preference is specified by the application, the driver
# adds mode: primaryPreferred.
# - If a read preference is specified by the application, the driver
# replaces the mode with primaryPreferred.
read_doc = if read
BSON::Document.new(read.to_doc)
else
BSON::Document.new
end
if [nil, 'primary'].include?(read_doc['mode'])
read_doc['mode'] = 'primaryPreferred'
end
read_doc
else
# In replica sets, read preference is passed to the server if one
# is specified by the application, except for primary read preferences.
read_doc = BSON::Document.new(read&.to_doc || {})
if [nil, 'primary'].include?(read_doc['mode'])
nil
else
read_doc
end
end
if read_doc
sel['$readPreference'] = read_doc
end
end
def apply_session_options(sel, connection)
apply_cluster_time!(sel, connection)
sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num
sel.merge!(lsid: session.session_id)
apply_start_transaction!(sel)
apply_causal_consistency!(sel, connection)
apply_autocommit!(sel)
apply_txn_opts!(sel)
suppress_read_write_concern!(sel)
validate_read_preference!(sel)
apply_txn_num!(sel)
if session.recovery_token &&
(sel[:commitTransaction] || sel[:abortTransaction])
then
sel[:recoveryToken] = session.recovery_token
end
if session.snapshot?
unless connection.description.server_version_gte?('5.0')
raise Error::SnapshotSessionInvalidServerVersion
end
sel[:readConcern] = {level: 'snapshot'}
if session.snapshot_timestamp
sel[:readConcern][:atClusterTime] = session.snapshot_timestamp
end
end
end
def build_message(connection, context)
if self.session != context.session
if self.session
raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}"
else
# Some operations are not constructed with sessions but are
# executed in a context where a session is available.
# This could be OK or a driver issue.
# TODO investigate.
end
end
super.tap do |message|
if session = context.session
# Serialize the message to detect client-side problems,
# such as invalid BSON keys or too large messages.
# The message will be serialized again
# later prior to being sent to the connection.
buf = BSON::ByteBuffer.new
message.serialize(buf)
if buf.length > connection.max_message_size
raise Error::MaxMessageSize.new(connection.max_message_size)
end
session.update_state!
end
end
end
end
end
end
|