require "monitor"
require "thread"

# Since we call to_s on new connection arguments and use that as a cache key, we
# need to make sure the memory address of the object is not used as part of the
# key. Otherwise identical objects with different memory address won't reuse the
# cache.
#
# In the case of proxy commands, this can lead to proxy processes leaking, and
# in severe cases can cause deploys to fail due to default file descriptor
# limits. An alternate solution would be to use a different means of generating
# hash keys.
#
require "net/ssh/proxy/command"
class Net::SSH::Proxy::Command
  # Ensure a stable string value is used, rather than memory address.
  def inspect
    @command_line_template
  end
end

# The ConnectionPool caches connections and allows them to be reused, so long as
# the reuse happens within the `idle_timeout` period. Timed out connections are
# eventually closed, forcing a new connection to be used in that case.
#
# Additionally, a background thread is started to check for abandoned
# connections that have timed out without any attempt at being reused. These
# are eventually closed as well and removed from the cache.
#
# If `idle_timeout` set to `false`, `0`, or `nil`, no caching is performed, and
# a new connection is created and then immediately closed each time. The default
# timeout is 30 (seconds).
#
# There is a single public method: `with`. Example usage:
#
#   pool = SSHKit::Backend::ConnectionPool.new
#   pool.with(Net::SSH.method(:start), "host", "username") do |connection|
#     # do stuff with connection
#   end
#
class SSHKit::Backend::ConnectionPool
  attr_accessor :idle_timeout

  def initialize(idle_timeout=30)
    @idle_timeout = idle_timeout
    @caches = {}
    @caches.extend(MonitorMixin)
    @timed_out_connections = Queue.new

    # Spin up eviction loop only if caching is enabled
    if cache_enabled?
      Thread.new { run_eviction_loop }
    end
  end

  # Creates a new connection or reuses a cached connection (if possible) and
  # yields the connection to the given block. Connections are created by
  # invoking the `connection_factory` proc with the given `args`. The arguments
  # are used to construct a key used for caching.
  def with(connection_factory, *args)
    cache = find_cache(args)
    conn = cache.pop || begin
      connection_factory.call(*args)
    end
    yield(conn)
  ensure
    cache.push(conn) unless conn.nil?
    # Sometimes the args mutate as a result of opening a connection. In this
    # case we need to update the cache key to match the new args.
    update_key_if_args_changed(cache, args)
  end

  # Immediately remove all cached connections, without closing them. This only
  # exists for unit test purposes.
  def flush_connections
    caches.synchronize { caches.clear }
  end

  # Immediately close all cached connections and empty the pool.
  def close_connections
    caches.synchronize do
      caches.values.each(&:clear)
      caches.clear
      process_deferred_close
    end
  end

  protected

  attr_reader :caches, :timed_out_connections

  private

  def cache_key_for_connection_args(args)
    args.hash
  end

  def cache_enabled?
    idle_timeout && idle_timeout > 0
  end

  # Look up a Cache that matches the given connection arguments.
  def find_cache(args)
    if cache_enabled?
      key = cache_key_for_connection_args(args)
      caches[key] || thread_safe_find_or_create_cache(key)
    else
      NilCache.new(method(:silently_close_connection))
    end
  end

  # Cache creation needs to happen in a mutex, because otherwise a race
  # condition might cause two identical caches to be created for the same key.
  def thread_safe_find_or_create_cache(key)
    caches.synchronize do
      caches[key] ||= begin
        Cache.new(key, idle_timeout, method(:silently_close_connection_later))
      end
    end
  end

  # Update cache key with changed args to prevent cache miss
  def update_key_if_args_changed(cache, args)
    new_key = cache_key_for_connection_args(args)

    caches.synchronize do
      return if cache.same_key?(new_key)
      caches[new_key] = caches.delete(cache.key)
      cache.key = new_key
    end
  end

  # Loops indefinitely to close connections and to find abandoned connections
  # that need to be closed.
  def run_eviction_loop
    loop do
      process_deferred_close

      # Periodically sweep all Caches to evict stale connections
      sleep(5)
      caches.values.each(&:evict)
    end
  end

  # Immediately close any connections that are pending closure.
  # rubocop:disable Lint/HandleExceptions
  def process_deferred_close
    until timed_out_connections.empty?
      connection = timed_out_connections.pop(true)
      silently_close_connection(connection)
    end
  rescue ThreadError
    # Queue#pop(true) raises ThreadError if the queue is empty.
    # This could only happen if `close_connections` is called at the same time
    # the background eviction thread has woken up to close connections. In any
    # case, it is not something we need to care about, since an empty queue is
    # perfectly OK.
  end
  # rubocop:enable Lint/HandleExceptions

  # Adds the connection to a queue that is processed asynchronously by a
  # background thread. The connection will eventually be closed.
  def silently_close_connection_later(connection)
    timed_out_connections << connection
  end

  # Close the given `connection` immediately, assuming it responds to a `close`
  # method. If it doesn't, or if `nil` is provided, it is silently ignored. Any
  # `StandardError` is also silently ignored. Returns `true` if the connection
  # was closed; `false` if it was already closed or could not be closed due to
  # an error.
  def silently_close_connection(connection)
    return false unless connection.respond_to?(:close)
    return false if connection.respond_to?(:closed?) && connection.closed?
    connection.close
    true
  rescue StandardError
    false
  end
end

require "sshkit/backends/connection_pool/cache"
require "sshkit/backends/connection_pool/nil_cache"
