1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2017-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Session
# A pool of server sessions.
#
# @api private
#
# @since 2.5.0
class SessionPool
# Initialize a SessionPool.
#
# @example
# SessionPool.new(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def initialize(cluster)
@queue = []
@mutex = Mutex.new
@cluster = cluster
end
# Get a formatted string for use in inspection.
#
# @example Inspect the session pool object.
# session_pool.inspect
#
# @return [ String ] The session pool inspection.
#
# @since 2.5.0
def inspect
"#<Mongo::Session::SessionPool:0x#{object_id} current_size=#{@queue.size}>"
end
# Check out a server session from the pool.
#
# @example Check out a session.
# pool.checkout
#
# @return [ ServerSession ] The server session.
#
# @since 2.5.0
def checkout
@mutex.synchronize do
loop do
if @queue.empty?
return ServerSession.new
else
session = @queue.shift
unless about_to_expire?(session)
return session
end
end
end
end
end
# Checkin a server session to the pool.
#
# @example Checkin a session.
# pool.checkin(session)
#
# @param [ Session::ServerSession ] session The session to checkin.
#
# @since 2.5.0
def checkin(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
end
@mutex.synchronize do
prune!
@queue.unshift(session) if return_to_queue?(session)
end
end
# End all sessions in the pool by sending the endSessions command to the server.
#
# @example End all sessions.
# pool.end_sessions
#
# @since 2.5.0
def end_sessions
while !@queue.empty?
server = ServerSelector.get(mode: :primary_preferred).select_server(@cluster)
op = Operation::Command.new(
selector: {
endSessions: @queue.shift(10_000).map(&:session_id),
},
db_name: Database::ADMIN,
)
context = Operation::Context.new(options: {
server_api: server.options[:server_api],
})
op.execute(server, context: context)
end
rescue Mongo::Error, Error::AuthError
end
private
# Query whether the given session is okay to return to the
# pool's queue.
#
# @param [ Session::ServerSession ] session the session to query
#
# @return [ true | false ] whether to return the session to the
# queue.
def return_to_queue?(session)
!session.dirty? && !about_to_expire?(session)
end
def about_to_expire?(session)
if session.nil?
raise ArgumentError, 'session cannot be nil'
end
# Load balancers spec explicitly requires to ignore the logical session
# timeout value.
# No rationale is provided as of the time of this writing.
if @cluster.load_balanced?
return false
end
logical_session_timeout = @cluster.logical_session_timeout
if logical_session_timeout
idle_time_minutes = (Time.now - session.last_use) / 60
(idle_time_minutes + 1) >= logical_session_timeout
end
end
def prune!
# Load balancers spec explicitly requires not to prune sessions.
# No rationale is provided as of the time of this writing.
return if @cluster.load_balanced?
while !@queue.empty?
if about_to_expire?(@queue[-1])
@queue.pop
else
break
end
end
end
end
end
end
|