1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Session
# A pool of server sessions.
#
# @api private
#
# @since 2.5.0
class SessionPool
# Create a SessionPool.
#
# @example
# SessionPool.create(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def self.create(cluster)
pool = new(cluster)
cluster.instance_variable_set(:@session_pool, pool)
end
# Initialize a SessionPool.
#
# @example
# SessionPool.new(cluster)
#
# @param [ Mongo::Cluster ] cluster The cluster that will be associated with this
# session pool.
#
# @since 2.5.0
def initialize(cluster)
@queue = []
@mutex = Mutex.new
@cluster = cluster
end
# Get a formatted string for use in inspection.
#
# @example Inspect the session pool object.
# session_pool.inspect
#
# @return [ String ] The session pool inspection.
#
# @since 2.5.0
def inspect
"#<Mongo::Session::SessionPool:0x#{object_id} current_size=#{@queue.size}>"
end
# Checkout a server session from the pool.
#
# @example Checkout a session.
# pool.checkout
#
# @return [ ServerSession ] The server session.
#
# @since 2.5.0
def checkout
@mutex.synchronize do
loop do
if @queue.empty?
return ServerSession.new
else
session = @queue.shift
unless about_to_expire?(session)
return session
end
end
end
end
end
# Checkin a server session to the pool.
#
# @example Checkin a session.
# pool.checkin(session)
#
# @param [ Session::ServerSession ] session The session to checkin.
#
# @since 2.5.0
def checkin(session)
@mutex.synchronize do
prune!
unless about_to_expire?(session)
@queue.unshift(session)
end
end
end
# End all sessions in the pool by sending the endSessions command to the server.
#
# @example End all sessions.
# pool.end_sessions
#
# @since 2.5.0
def end_sessions
while !@queue.empty?
server = ServerSelector.get(mode: :primary_preferred).select_server(@cluster)
Operation::Commands::Command.new(
:selector => {endSessions: @queue.shift(10_000).collect { |s| s.session_id }},
:db_name => Database::ADMIN).execute(server)
end
rescue
end
private
def about_to_expire?(session)
if @cluster.logical_session_timeout
idle_time_minutes = (Time.now - session.last_use) / 60
(idle_time_minutes + 1) >= @cluster.logical_session_timeout
end
end
def prune!
while !@queue.empty?
if about_to_expire?(@queue[-1])
@queue.pop
else
break
end
end
end
end
end
end
|