1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
# frozen_string_literal: true
# rubocop:todo all
# Copyright (C) 2021 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
module Mongo
class Server
class ConnectionPool
# @api private
class GenerationManager
def initialize(server:)
@map = Hash.new { |hash, key| hash[key] = 1 }
@pipe_fds = Hash.new { |hash, key| hash[key] = { 1 => IO.pipe } }
@server = server
@lock = Mutex.new
@scheduled_for_close = []
end
attr_reader :server
def generation(service_id: nil)
validate_service_id!(service_id)
@lock.synchronize do
@map[service_id]
end
end
def generation_unlocked(service_id: nil)
validate_service_id!(service_id)
@map[service_id]
end
def pipe_fds(service_id: nil)
@pipe_fds.dig(service_id, @map[service_id])
end
def remove_pipe_fds(generation, service_id: nil)
validate_service_id!(service_id)
r, w = @pipe_fds[service_id].delete(generation)
return unless r && w
w.close
# Schedule the read end of the pipe to be closed. We cannot close it
# immediately since we need to wait for any Kernel#select calls to
# notice that part of the pipe is closed, and check the socket. This
# all happens when attempting to read from the socket and waiting for
# it to become ready again.
@scheduled_for_close << r
end
def bump(service_id: nil)
@lock.synchronize do
close_all_scheduled
if service_id
gen = @map[service_id] += 1
@pipe_fds[service_id] ||= {}
@pipe_fds[service_id][gen] = IO.pipe
else
# When service id is not supplied, one of two things may be
# happening;
#
# 1. The pool is not to a load balancer, in which case we only
# need to increment the generation for the nil service_id.
# 2. The pool is to a load balancer, in which case we need to
# increment the generation for each service.
#
# Incrementing everything in the map accomplishes both tasks.
@map.each do |k, v|
gen = @map[k] += 1
@pipe_fds[service_id] ||= {}
@pipe_fds[service_id][gen] = IO.pipe
end
end
end
end
# Close all pipes in the generation manager.
#
# This method should be called only when the +ConnectionPool+ that
# owns this +GenerationManager+ is closed, to ensure that all
# pipes are closed properly.
def close_all_pipes
@lock.synchronize do
close_all_scheduled
@pipe_fds.keys.each do |service_id|
generations = @pipe_fds.delete(service_id)
generations.values.each do |(r, w)|
r.close
w.close
rescue IOError
# Ignore any IOError that occurs when closing the
# pipe, as there is nothing we can do about it.
end
end
end
end
private
def validate_service_id!(service_id)
if service_id
unless server.load_balancer?
raise ArgumentError, "Generation scoping to services is only available in load-balanced mode, but the server at #{server.address} is not a load balancer"
end
else
if server.load_balancer?
raise ArgumentError, "The server at #{server.address} is a load balancer and therefore does not have a single global generation"
end
end
end
# Close all fds scheduled for closing.
def close_all_scheduled
while pipe = @scheduled_for_close.pop
pipe.close
end
end
end
end
end
end
|