File: connection_pool.rb

package info (click to toggle)
ruby-sshkit 1.21.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 700 kB
  • sloc: ruby: 3,522; makefile: 2
file content (183 lines) | stat: -rw-r--r-- 6,060 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
require "monitor"
require "thread"

# Since we call to_s on new connection arguments and use that as a cache key, we
# need to make sure the memory address of the object is not used as part of the
# key. Otherwise identical objects with different memory address won't reuse the
# cache.
#
# In the case of proxy commands, this can lead to proxy processes leaking, and
# in severe cases can cause deploys to fail due to default file descriptor
# limits. An alternate solution would be to use a different means of generating
# hash keys.
#
require "net/ssh/proxy/command"
class Net::SSH::Proxy::Command
  # Ensure a stable string value is used, rather than memory address.
  def inspect
    @command_line_template
  end
end

# The ConnectionPool caches connections and allows them to be reused, so long as
# the reuse happens within the `idle_timeout` period. Timed out connections are
# eventually closed, forcing a new connection to be used in that case.
#
# Additionally, a background thread is started to check for abandoned
# connections that have timed out without any attempt at being reused. These
# are eventually closed as well and removed from the cache.
#
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and
# a new connection is created and then immediately closed each time. The default
# timeout is 30 (seconds).
#
# There is a single public method: `with`. Example usage:
#
#   pool = SSHKit::Backend::ConnectionPool.new
#   pool.with(Net::SSH.method(:start), "host", "username") do |connection|
#     # do stuff with connection
#   end
#
class SSHKit::Backend::ConnectionPool
  attr_accessor :idle_timeout

  def initialize(idle_timeout=30)
    @idle_timeout = idle_timeout
    @caches = {}
    @caches.extend(MonitorMixin)
    @timed_out_connections = Queue.new

    # Spin up eviction loop only if caching is enabled
    if cache_enabled?
      Thread.new { run_eviction_loop }
    end
  end

  # Creates a new connection or reuses a cached connection (if possible) and
  # yields the connection to the given block. Connections are created by
  # invoking the `connection_factory` proc with the given `args`. The arguments
  # are used to construct a key used for caching.
  def with(connection_factory, *args)
    cache = find_cache(args)
    conn = cache.pop || begin
      connection_factory.call(*args)
    end
    yield(conn)
  ensure
    cache.push(conn) unless conn.nil?
    # Sometimes the args mutate as a result of opening a connection. In this
    # case we need to update the cache key to match the new args.
    update_key_if_args_changed(cache, args)
  end

  # Immediately remove all cached connections, without closing them. This only
  # exists for unit test purposes.
  def flush_connections
    caches.synchronize { caches.clear }
  end

  # Immediately close all cached connections and empty the pool.
  def close_connections
    caches.synchronize do
      caches.values.each(&:clear)
      caches.clear
      process_deferred_close
    end
  end

  protected

  attr_reader :caches, :timed_out_connections

  private

  def cache_key_for_connection_args(args)
    args.hash
  end

  def cache_enabled?
    idle_timeout && idle_timeout > 0
  end

  # Look up a Cache that matches the given connection arguments.
  def find_cache(args)
    if cache_enabled?
      key = cache_key_for_connection_args(args)
      caches[key] || thread_safe_find_or_create_cache(key)
    else
      NilCache.new(method(:silently_close_connection))
    end
  end

  # Cache creation needs to happen in a mutex, because otherwise a race
  # condition might cause two identical caches to be created for the same key.
  def thread_safe_find_or_create_cache(key)
    caches.synchronize do
      caches[key] ||= begin
        Cache.new(key, idle_timeout, method(:silently_close_connection_later))
      end
    end
  end

  # Update cache key with changed args to prevent cache miss
  def update_key_if_args_changed(cache, args)
    new_key = cache_key_for_connection_args(args)

    caches.synchronize do
      return if cache.same_key?(new_key)
      caches[new_key] = caches.delete(cache.key)
      cache.key = new_key
    end
  end

  # Loops indefinitely to close connections and to find abandoned connections
  # that need to be closed.
  def run_eviction_loop
    loop do
      process_deferred_close

      # Periodically sweep all Caches to evict stale connections
      sleep(5)
      caches.values.each(&:evict)
    end
  end

  # Immediately close any connections that are pending closure.
  # rubocop:disable Lint/HandleExceptions
  def process_deferred_close
    until timed_out_connections.empty?
      connection = timed_out_connections.pop(true)
      silently_close_connection(connection)
    end
  rescue ThreadError
    # Queue#pop(true) raises ThreadError if the queue is empty.
    # This could only happen if `close_connections` is called at the same time
    # the background eviction thread has woken up to close connections. In any
    # case, it is not something we need to care about, since an empty queue is
    # perfectly OK.
  end
  # rubocop:enable Lint/HandleExceptions

  # Adds the connection to a queue that is processed asynchronously by a
  # background thread. The connection will eventually be closed.
  def silently_close_connection_later(connection)
    timed_out_connections << connection
  end

  # Close the given `connection` immediately, assuming it responds to a `close`
  # method. If it doesn't, or if `nil` is provided, it is silently ignored. Any
  # `StandardError` is also silently ignored. Returns `true` if the connection
  # was closed; `false` if it was already closed or could not be closed due to
  # an error.
  def silently_close_connection(connection)
    return false unless connection.respond_to?(:close)
    return false if connection.respond_to?(:closed?) && connection.closed?
    connection.close
    true
  rescue StandardError
    false
  end
end

require "sshkit/backends/connection_pool/cache"
require "sshkit/backends/connection_pool/nil_cache"