1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
require "monitor"
require "thread"
# Since we call to_s on new connection arguments and use that as a cache key, we
# need to make sure the memory address of the object is not used as part of the
# key. Otherwise identical objects with different memory address won't reuse the
# cache.
#
# In the case of proxy commands, this can lead to proxy processes leaking, and
# in severe cases can cause deploys to fail due to default file descriptor
# limits. An alternate solution would be to use a different means of generating
# hash keys.
#
require "net/ssh/proxy/command"
class Net::SSH::Proxy::Command
# Ensure a stable string value is used, rather than memory address.
def inspect
@command_line_template
end
end
# The ConnectionPool caches connections and allows them to be reused, so long as
# the reuse happens within the `idle_timeout` period. Timed out connections are
# eventually closed, forcing a new connection to be used in that case.
#
# Additionally, a background thread is started to check for abandoned
# connections that have timed out without any attempt at being reused. These
# are eventually closed as well and removed from the cache.
#
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and
# a new connection is created and then immediately closed each time. The default
# timeout is 30 (seconds).
#
# There is a single public method: `with`. Example usage:
#
# pool = SSHKit::Backend::ConnectionPool.new
# pool.with(Net::SSH.method(:start), "host", "username") do |connection|
# # do stuff with connection
# end
#
class SSHKit::Backend::ConnectionPool
attr_accessor :idle_timeout
def initialize(idle_timeout=30)
@idle_timeout = idle_timeout
@caches = {}
@caches.extend(MonitorMixin)
@timed_out_connections = Queue.new
# Spin up eviction loop only if caching is enabled
if cache_enabled?
Thread.new { run_eviction_loop }
end
end
# Creates a new connection or reuses a cached connection (if possible) and
# yields the connection to the given block. Connections are created by
# invoking the `connection_factory` proc with the given `args`. The arguments
# are used to construct a key used for caching.
def with(connection_factory, *args)
cache = find_cache(args)
conn = cache.pop || begin
connection_factory.call(*args)
end
yield(conn)
ensure
cache.push(conn) unless conn.nil?
# Sometimes the args mutate as a result of opening a connection. In this
# case we need to update the cache key to match the new args.
update_key_if_args_changed(cache, args)
end
# Immediately remove all cached connections, without closing them. This only
# exists for unit test purposes.
def flush_connections
caches.synchronize { caches.clear }
end
# Immediately close all cached connections and empty the pool.
def close_connections
caches.synchronize do
caches.values.each(&:clear)
caches.clear
process_deferred_close
end
end
protected
attr_reader :caches, :timed_out_connections
private
def cache_key_for_connection_args(args)
args.hash
end
def cache_enabled?
idle_timeout && idle_timeout > 0
end
# Look up a Cache that matches the given connection arguments.
def find_cache(args)
if cache_enabled?
key = cache_key_for_connection_args(args)
caches[key] || thread_safe_find_or_create_cache(key)
else
NilCache.new(method(:silently_close_connection))
end
end
# Cache creation needs to happen in a mutex, because otherwise a race
# condition might cause two identical caches to be created for the same key.
def thread_safe_find_or_create_cache(key)
caches.synchronize do
caches[key] ||= begin
Cache.new(key, idle_timeout, method(:silently_close_connection_later))
end
end
end
# Update cache key with changed args to prevent cache miss
def update_key_if_args_changed(cache, args)
new_key = cache_key_for_connection_args(args)
caches.synchronize do
return if cache.same_key?(new_key)
caches[new_key] = caches.delete(cache.key)
cache.key = new_key
end
end
# Loops indefinitely to close connections and to find abandoned connections
# that need to be closed.
def run_eviction_loop
loop do
process_deferred_close
# Periodically sweep all Caches to evict stale connections
sleep(5)
caches.values.each(&:evict)
end
end
# Immediately close any connections that are pending closure.
# rubocop:disable Lint/HandleExceptions
def process_deferred_close
until timed_out_connections.empty?
connection = timed_out_connections.pop(true)
silently_close_connection(connection)
end
rescue ThreadError
# Queue#pop(true) raises ThreadError if the queue is empty.
# This could only happen if `close_connections` is called at the same time
# the background eviction thread has woken up to close connections. In any
# case, it is not something we need to care about, since an empty queue is
# perfectly OK.
end
# rubocop:enable Lint/HandleExceptions
# Adds the connection to a queue that is processed asynchronously by a
# background thread. The connection will eventually be closed.
def silently_close_connection_later(connection)
timed_out_connections << connection
end
# Close the given `connection` immediately, assuming it responds to a `close`
# method. If it doesn't, or if `nil` is provided, it is silently ignored. Any
# `StandardError` is also silently ignored. Returns `true` if the connection
# was closed; `false` if it was already closed or could not be closed due to
# an error.
def silently_close_connection(connection)
return false unless connection.respond_to?(:close)
return false if connection.respond_to?(:closed?) && connection.closed?
connection.close
true
rescue StandardError
false
end
end
require "sshkit/backends/connection_pool/cache"
require "sshkit/backends/connection_pool/nil_cache"
|