require "spec_helper"
require "rabbitmq/http/client"

require "bunny/concurrent/condition"

describe "Connection recovery" do
  let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") }
  let(:logger) { Logger.new($stderr).tap {|logger| logger.level = ENV["BUNNY_LOG_LEVEL"] || Logger::WARN } }
  let(:recovery_interval) { 0.2 }

  it "reconnects after grace period" do
    with_open do |c|
      close_all_connections!
      wait_for_recovery_with { connections.any? }
    end
  end

  it "reconnects after grace period (with multiple hosts)" do
    with_open_multi_host do |c|
      close_all_connections!
      wait_for_recovery_with { connections.any? }
    end
  end

  it "reconnects after grace period (with multiple hosts, including a broken one)" do
    with_open_multi_broken_host do |c|
      close_all_connections!
      wait_for_recovery_with { connections.any? }
    end
  end

  it "recovers channels" do
    with_open do |c|
      ch1 = c.create_channel
      ch2 = c.create_channel
      sleep 1.5
      close_all_connections!
      sleep 0.5
      poll_until { channels.count == 2 }
      expect(ch1).to be_open
      expect(ch2).to be_open
    end
  end

  it "provides a recovery completion callback" do
    with_open do |c|
      latch = Bunny::Concurrent::Condition.new
      c.after_recovery_completed do
        latch.notify
      end

      ch = c.create_channel
      sleep 1.0
      close_all_connections!
      poll_until { c.open? && ch.open? }
      poll_until { latch.none_threads_waiting? }
    end
  end

  it "recovers channels (with multiple hosts)" do
    with_open_multi_host do |c|
      ch1 = c.create_channel
      ch2 = c.create_channel
      sleep 1.5
      close_all_connections!
      sleep 0.5
      poll_until { channels.count == 2 }
      expect(ch1).to be_open
      expect(ch2).to be_open
    end
  end

  it "recovers channels (with multiple hosts, including a broken one)" do
    with_open_multi_broken_host do |c|
      ch1 = c.create_channel
      ch2 = c.create_channel
      sleep 1.5
      close_all_connections!
      sleep 0.5
      poll_until { channels.count == 2 }
      expect(ch1).to be_open
      expect(ch2).to be_open
    end
  end

  it "recovers basic.qos prefetch setting" do
    with_open do |c|
      ch = c.create_channel
      ch.prefetch(11)
      expect(ch.prefetch_count).to eq 11
      expect(ch.prefetch_global).to be false
      sleep 1.5
      close_all_connections!
      sleep 0.5
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      expect(ch.prefetch_count).to eq 11
      expect(ch.prefetch_global).to be false
    end
  end

  it "recovers basic.qos prefetch global setting" do
    with_open do |c|
      ch = c.create_channel
      ch.prefetch(42, true)
      expect(ch.prefetch_count).to eq 42
      expect(ch.prefetch_global).to be true
      sleep 1.5
      close_all_connections!
      sleep 0.5
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      expect(ch.prefetch_count).to eq 42
      expect(ch.prefetch_global).to be true
    end
  end

  it "recovers publisher confirms setting" do
    with_open do |c|
      ch = c.create_channel
      ch.confirm_select
      expect(ch).to be_using_publisher_confirms
      sleep 1.5
      close_all_connections!
      sleep 0.5
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      expect(ch).to be_using_publisher_confirms
    end
  end

  it "recovers transactionality setting" do
    with_open do |c|
      ch = c.create_channel
      ch.tx_select
      expect(ch).to be_using_tx
      sleep 1.5
      close_all_connections!
      sleep 0.5
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      expect(ch).to be_using_tx
    end
  end

  it "recovers client-named queues" do
    with_open do |c|
      ch = c.create_channel
      q  = ch.queue("bunny.tests.recovery.client-named#{rand}")
      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      ensure_queue_recovery(ch, q)
      q.delete
    end
  end

  # a very simplistic test for queues inspired by #422
  it "recovers client-named queues declared with no_declare: true" do
    with_open do |c|
      ch  = c.create_channel
      ch2 = c.create_channel

      n   = rand
      s   = "bunny.tests.recovery.client-named#{n}"

      q   = ch.queue(s)
      q2  = ch2.queue(s, no_declare: true)

      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      ensure_queue_recovery(ch, q)
      q.delete
    end
  end

  # a test for #422
  it "recovers client-named queues declared with passive: true" do
    with_open do |c|
      ch  = c.create_channel
      ch2 = c.create_channel

      n   = rand
      s   = "bunny.tests.recovery.client-named#{n}"

      q   = ch.queue(s)
      q2  = ch2.queue(s, passive: true)

      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      ensure_queue_recovery(ch, q)
      ensure_queue_recovery(ch, q2)
      q.delete
    end
  end

  it "recovers server-named queues" do
    with_open do |c|
      ch = c.create_channel
      q  = ch.queue("", exclusive: true)
      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      ensure_queue_recovery(ch, q)
    end
  end

  it "recovers queue bindings" do
    with_open do |c|
      ch = c.create_channel
      x  = ch.fanout("amq.fanout")
      q  = ch.queue("", exclusive: true)
      q.bind(x)
      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      ensure_queue_binding_recovery(ch, x, q)
    end
  end

  it "recovers exchanges and their bindings" do
    with_open do |c|
      ch          = c.create_channel
      source      = ch.fanout("source.exchange.recovery.example", auto_delete: true)
      destination = ch.fanout("destination.exchange.recovery.example", auto_delete: true)

      destination.bind(source)

      # Exchanges won't get auto-deleted on connection loss unless they have
      # had an exclusive queue bound to them.
      dst_queue   = ch.queue("", exclusive: true)
      dst_queue.bind(destination, routing_key: "")

      src_queue   = ch.queue("", exclusive: true)
      src_queue.bind(source, routing_key: "")

      close_all_connections!

      wait_for_recovery_with { connections.any? && exchange_names_in_vhost("/").include?(source.name) }
      ch.confirm_select

      source.publish("msg", routing_key: "")
      ch.wait_for_confirms
      expect(dst_queue.message_count).to eq 1
      destination.delete
    end
  end

  it "recovers passively declared exchanges and their bindings" do
    with_open do |c|
      ch          = c.create_channel
      ch.confirm_select

      source      = ch.fanout("amq.fanout", passive: true)
      destination = ch.fanout("destination.exchange.recovery.example", auto_delete: true)

      destination.bind(source)

      # Exchanges won't get auto-deleted on connection loss unless they have
      # had an exclusive queue bound to them.
      dst_queue   = ch.queue("", exclusive: true)
      dst_queue.bind(destination, routing_key: "")

      src_queue   = ch.queue("", exclusive: true)
      src_queue.bind(source, routing_key: "")

      close_all_connections!

      wait_for_recovery_with { connections.any? }

      source.publish("msg", routing_key: "")
      ch.wait_for_confirms

      expect(dst_queue.message_count).to eq 1
      destination.delete
    end
  end

  # this is a simplistic test that primarily execises the code path from #412
  it "recovers exchanges that were declared with passive = true" do
    with_open do |c|
      ch          = c.create_channel
      ch2         = c.create_channel
      source      = ch.fanout("source.exchange.recovery.example", auto_delete: true)
      destination = ch.fanout("destination.exchange.recovery.example", auto_delete: true)

      source2      = ch2.fanout("source.exchange.recovery.example", no_declare: true)

      destination.bind(source)

      # Exchanges won't get auto-deleted on connection loss unless they have
      # had an exclusive queue bound to them.
      dst_queue   = ch.queue("", exclusive: true)
      dst_queue.bind(destination, routing_key: "")

      src_queue   = ch.queue("", exclusive: true)
      src_queue.bind(source, routing_key: "")

      close_all_connections!

      wait_for_recovery_with { connections.any? && exchange_names_in_vhost("/").include?(source.name) }

      ch2.confirm_select

      source2.publish("msg", routing_key: "")
      ch2.wait_for_confirms
      expect(dst_queue.message_count).to eq 1
    end
  end

  it "recovers allocated channel ids" do
    with_open do |c|
      q = "queue#{Bunny::Timestamp.now.to_i}"
      10.times { c.create_channel }
      expect(c.queue_exists?(q)).to eq false
      close_all_connections!
      wait_for_recovery_with { channels.any? }
      # make sure the connection isn't closed shortly after
      # due to "second 'channel.open' seen". MK.
      expect(c).to be_open
      sleep 0.1
      expect(c).to be_open
      sleep 0.1
      expect(c).to be_open
    end
  end

  it "recovers consumers" do
    with_open do |c|
      delivered = false

      ch = c.create_channel
      ch.confirm_select
      q  = ch.queue("", exclusive: true)
      q.subscribe do |_, _, _|
        delivered = true
      end
      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open

      q.publish("")
      ch.wait_for_confirms

      poll_until { delivered }
    end
  end

  it "recovers all consumers" do
    n = 32

    with_open do |c|
      ch = c.create_channel
      q  = ch.queue("", exclusive: true)
      n.times { q.subscribe { |_, _, _| } }
      close_all_connections!
      wait_for_recovery_with { connections.any? }
      expect(ch).to be_open
      sleep 0.5

      expect(q.consumer_count).to eq n
    end
  end

  it "recovers all queues" do
    n = 32

    qs = []

    with_open do |c|
      ch = c.create_channel

      n.times do
        qs << ch.queue("", exclusive: true)
      end
      close_all_connections!
      wait_for_recovery_with { queue_names.include?(qs.first.name) }
      sleep 0.5
      expect(ch).to be_open

      qs.each do |q|
        ch.queue_declare(q.name, passive: true)
      end
    end
  end

  def exchange_names_in_vhost(vhost)
    http_client.list_exchanges(vhost).map {|e| e["name"]}
  end

  def connections
    http_client.list_connections
  end

  def channels
    http_client.list_channels
  end

  def queue_names
    http_client.list_queues.map {|q| q["name"]}
  end

  def close_all_connections!
    # let whatever actions were taken before
    # this call a chance to propagate, e.g. to make
    # sure that connections are accounted for in the
    # stats DB.
    #
    # See bin/ci/before_build for management plugin
    # pre-configuration.
    #
    # MK.
    sleep 1.1
    connections.each do |conn_info|
      close_ignoring_permitted_exceptions(conn_info.name)
    end
  end

  def close_ignoring_permitted_exceptions(connection_name)
    http_client.close_connection(connection_name)
  rescue Bunny::ConnectionForced, Faraday::ResourceNotFound
    # ignored
  end

  def wait_for_recovery_with(&probe)
    poll_until &probe
  end

  def poll_while(&probe)
    Bunny::TestKit.poll_while(&probe)
  end

  def poll_until(&probe)
    Bunny::TestKit.poll_until(&probe)
  end

  def with_open(c = Bunny.new(network_recovery_interval: recovery_interval,
                              recover_from_connection_close: true,
                              logger: logger), &block)
    c.start
    block.call(c)
  ensure
    c.close(false) rescue nil
  end

  def with_open_multi_host(&block)
    c = Bunny.new(hosts: ["127.0.0.1", "localhost"],
                  network_recovery_interval: recovery_interval,
                  recover_from_connection_close: true,
                  logger: logger)
    with_open(c, &block)
  end

  def with_open_multi_broken_host(&block)
    c = Bunny.new(hosts: ["broken", "127.0.0.1", "localhost"],
                  hosts_shuffle_strategy: Proc.new { |hosts| hosts }, # We do not shuffle for these tests so we always hit the broken host
                  network_recovery_interval: recovery_interval,
                  recover_from_connection_close: true,
                  logger: logger)
    with_open(c, &block)
  end

  def ensure_queue_recovery(ch, q)
    ch.confirm_select
    q.purge
    x = ch.default_exchange
    x.publish("msg", routing_key: q.name)
    ch.wait_for_confirms
    expect(q.message_count).to eq 1
    q.purge
  end

  def ensure_queue_binding_recovery(ch, x, q, routing_key = "")
    ch.confirm_select
    q.purge
    x.publish("msg", routing_key: routing_key)
    ch.wait_for_confirms
    expect(q.message_count).to eq 1
    q.purge
  end
end
