require "spec/helper/all"

describe EM::Synchrony::AMQP do

  xit "should yield until connection is ready" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      connection.connected?.should eq(true)
      EM.stop
    end
  end

  xit "should yield until disconnection is complete" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      connection.disconnect
      connection.connected?.should eq(false)
      EM.stop
    end
  end

  xit "should yield until the channel is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      channel.should be_kind_of(EM::Synchrony::AMQP::Channel)
      EM.stop
    end
  end

  xit "should yield until the queue is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
      EM.stop
    end
  end

  xit "should yield when a queue is created from a channel" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      queue = channel.queue("test.em-synchrony.queue1", :auto_delete => true)
      EM.stop
    end
  end

  xit "should yield until the exchange is created" do
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)

      exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange")
      exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)

      [:direct, :fanout, :topic, :headers].each do |type|
        # Exercise cached exchange code path
        2.times.map { channel.send(type, "test.em-synchrony.#{type}") }.each do |ex|
          ex.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
        end
      end

      EM.stop
    end
  end

  xit "should publish and receive messages" do
    nb_msg = 10
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")

      q1 = channel.queue("test.em-synchrony.queues.1", :auto_delete => true)
      q2 = channel.queue("test.em-synchrony.queues.2", :auto_delete => true)

      q1.bind(ex)
      q2.bind(ex)

      nb_q1, nb_q2 = 0, 0
      stop_cb = proc { EM.stop if nb_q1 + nb_q2 == 2 * nb_msg }

      q1.subscribe(:ack => false) do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_q1 += 1
        stop_cb.call
      end

      q2.subscribe do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_q2 += 1
        stop_cb.call
      end

      Fiber.new do
        nb_msg.times do |n|
          ex.publish("Bonjour #{n}")
          EM::Synchrony.sleep(0.1)
        end
      end.resume
    end
  end

  xit "should handle several consumers" do
    nb_msg = 10
    EM.synchrony do
      connection = EM::Synchrony::AMQP.connect
      channel = EM::Synchrony::AMQP::Channel.new(connection)
      exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.consumers.fanout")

      queue = channel.queue("test.em-synchrony.consumers.queue", :auto_delete => true)
      queue.bind(exchange)

      cons1 = EM::Synchrony::AMQP::Consumer.new(channel, queue)
      cons2 = EM::Synchrony::AMQP::Consumer.new(channel, queue)

      nb_cons1, nb_cons2 = 0, 0
      stop_cb = Proc.new do
        if nb_cons1 + nb_cons2 == nb_msg
          nb_cons1.should eq(nb_cons2)
          EM.stop
        end
      end

      cons1.on_delivery do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_cons1 += 1
        stop_cb.call
      end.consume

      cons2.on_delivery do |meta, msg|
        msg.should match(/^Bonjour [0-9]+/)
        nb_cons2 += 1
        stop_cb.call
      end.consume

      10.times do |n|
        exchange.publish("Bonjour #{n}")
        EM::Synchrony.sleep(0.1)
      end
   end
  end
end
