import socket
from array import array
from struct import pack
from unittest.mock import ANY, Mock, call, patch

import pytest

import amqp
from amqp import Channel, Connection, Message, sasl, spec
from amqp.exceptions import (AccessRefused, ConnectionError, InvalidCommand,
                             NotFound, PreconditionFailed, ResourceLocked)
from amqp.protocol import queue_declare_ok_t
from amqp.serialization import dumps, loads

connection_testdata = (
    (spec.Connection.Blocked, '_on_blocked'),
    (spec.Connection.Unblocked, '_on_unblocked'),
    (spec.Connection.Secure, '_on_secure'),
    (spec.Connection.CloseOk, '_on_close_ok'),
)

channel_testdata = (
    (spec.Basic.Ack, '_on_basic_ack'),
    (spec.Basic.Nack, '_on_basic_nack'),
    (spec.Basic.CancelOk, '_on_basic_cancel_ok'),
)

exchange_declare_error_testdata = (
    (
        503, "COMMAND_INVALID - "
        "unknown exchange type 'exchange-type'",
        InvalidCommand
    ),
    (
        403, "ACCESS_REFUSED - "
        "exchange name 'amq.foo' contains reserved prefix 'amq.*'",
        AccessRefused
    ),
    (
        406, "PRECONDITION_FAILED - "
        "inequivalent arg 'type' for exchange 'foo' in vhost '/':"
        "received 'direct' but current is 'fanout'",
        PreconditionFailed
    ),
)

queue_declare_error_testdata = (
    (
        403, "ACCESS_REFUSED - "
        "queue name 'amq.foo' contains reserved prefix 'amq.*",
        AccessRefused
    ),
    (
        404, "NOT_FOUND - "
        "no queue 'foo' in vhost '/'",
        NotFound
    ),
    (
        405, "RESOURCE_LOCKED - "
        "cannot obtain exclusive access to locked queue 'foo' in vhost '/'",
        ResourceLocked
    ),
)

CLIENT_PROPERTIES = {
    'product': 'py-amqp',
    'product_version': amqp.__version__,
    'capabilities': {
        'consumer_cancel_notify': True,
        'connection.blocked': True,
        'authentication_failure_close': True
    },
}

SERVER_PROPERTIES = {
    'capabilities': {
        'publisher_confirms': True,
        'exchange_exchange_bindings': True,
        'basic.nack': True,
        'consumer_cancel_notify': True,
        'connection.blocked': True,
        'consumer_priorities': True,
        'authentication_failure_close': True,
        'per_consumer_qos': True,
        'direct_reply_to': True
    },
    'cluster_name': 'rabbit@broker.com',
    'copyright': 'Copyright (C) 2007-2018 Pivotal Software, Inc.',
    'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/',
    'platform': 'Erlang/OTP 20.3.8.9',
    'product': 'RabbitMQ',
    'version': '3.7.8'
}


def build_frame_type_1(method, channel=0, args=b'', arg_format=None):
    if len(args) > 0:
        args = dumps(arg_format, args)
    else:
        args = b''
    frame = (b''.join([pack('>HH', *method), args]))
    return 1, channel, frame


def build_frame_type_2(body_len, channel, properties):
    frame = (b''.join(
        [pack('>HxxQ', spec.Basic.CLASS_ID, body_len), properties])
    )
    return 2, channel, frame


def build_frame_type_3(channel, body):
    return 3, channel, body


class DataComparator:
    # Comparator used for asserting serialized data. It can be used
    # in cases when direct comparison of bytestream cannot be used
    # (mainly cases of Table type where order of items can vary)
    def __init__(self, argsig, items):
        self.argsig = argsig
        self.items = items

    def __eq__(self, other):
        values, offset = loads(self.argsig, other, 0)
        return tuple(values) == tuple(self.items)


def handshake(conn, transport_mock, server_properties=None):
    # Helper function simulating connection handshake with server
    if server_properties is None:
        server_properties = SERVER_PROPERTIES

    transport_mock().read_frame.side_effect = [
        build_frame_type_1(
            spec.Connection.Start, channel=0,
            args=(
                0, 9, server_properties, 'AMQPLAIN PLAIN', 'en_US'
            ),
            arg_format='ooFSS'
        ),
        build_frame_type_1(
            spec.Connection.Tune, channel=0,
            args=(2047, 131072, 60), arg_format='BlB'
        ),
        build_frame_type_1(
            spec.Connection.OpenOk, channel=0
        )
    ]
    conn.connect()
    transport_mock().read_frame.side_effect = None


def create_channel(channel_id, conn, transport_mock):
    transport_mock().read_frame.side_effect = [
        build_frame_type_1(
            spec.Channel.OpenOk,
            channel=channel_id,
            args=(1, False),
            arg_format='Lb'
        )
    ]
    ch = conn.channel(channel_id=channel_id)
    transport_mock().read_frame.side_effect = None
    return ch


class test_connection:
    # Integration tests. Tests verify the correctness of communication between
    # library and broker.
    # * tests mocks broker responses mocking return values of
    #   amqp.transport.Transport.read_frame() method
    # * tests asserts expected library responses to broker via calls of
    #   amqp.method_framing.frame_writer() function

    def test_connect(self):
        # Test checking connection handshake
        frame_writer_cls_mock = Mock()
        on_open_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        conn = Connection(
            frame_writer=frame_writer_cls_mock, on_open=on_open_mock
        )

        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            on_open_mock.assert_called_once_with(conn)
            security_mechanism = sasl.AMQPLAIN(
                'guest', 'guest'
            ).start(conn).decode('utf-8', 'surrogatepass')


    def test_connect_no_capabilities(self):
        # Test checking connection handshake with broker
        # not supporting capabilities
        frame_writer_cls_mock = Mock()
        on_open_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        conn = Connection(
            frame_writer=frame_writer_cls_mock, on_open=on_open_mock
        )

        with patch.object(conn, 'Transport') as transport_mock:
            server_properties = dict(SERVER_PROPERTIES)
            del server_properties['capabilities']
            client_properties = dict(CLIENT_PROPERTIES)
            del client_properties['capabilities']

            handshake(
                conn, transport_mock, server_properties=server_properties
            )
            on_open_mock.assert_called_once_with(conn)
            security_mechanism = sasl.AMQPLAIN(
                'guest', 'guest'
            ).start(conn).decode('utf-8', 'surrogatepass')

            # Expected responses from client
            frame_writer_mock.assert_has_calls(
                [
                    call(
                        1, 0, spec.Connection.StartOk,
                        # Due Table type, we cannot compare bytestream directly
                        DataComparator(
                            'FsSs',
                            (
                                client_properties, 'AMQPLAIN',
                                security_mechanism,
                                'en_US'
                            )
                        ),
                        None
                    ),
                    call(
                        1, 0, spec.Connection.TuneOk,
                        dumps(
                            'BlB',
                            (conn.channel_max, conn.frame_max, conn.heartbeat)
                        ),
                        None
                    ),
                    call(
                        1, 0, spec.Connection.Open,
                        dumps('ssb', (conn.virtual_host, '', False)),
                        None
                    )
                ]
            )
            assert conn.client_properties == client_properties

    def test_connect_missing_capabilities(self):
        # Test checking connection handshake with broker
        # supporting subset of capabilities
        frame_writer_cls_mock = Mock()
        on_open_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        conn = Connection(
            frame_writer=frame_writer_cls_mock, on_open=on_open_mock
        )

        with patch.object(conn, 'Transport') as transport_mock:
            server_properties = dict(SERVER_PROPERTIES)
            server_properties['capabilities'] = {
                # This capability is not supported by client
                'basic.nack': True,
                'consumer_cancel_notify': True,
                'connection.blocked': False,
                # server does not support 'authentication_failure_close'
                # which is supported by client
            }

            client_properties = dict(CLIENT_PROPERTIES)
            client_properties['capabilities'] = {
                'consumer_cancel_notify': True,
            }

            handshake(
                conn, transport_mock, server_properties=server_properties
            )
            on_open_mock.assert_called_once_with(conn)
            security_mechanism = sasl.AMQPLAIN(
                'guest', 'guest'
            ).start(conn).decode('utf-8', 'surrogatepass')

            # Expected responses from client
            frame_writer_mock.assert_has_calls(
                [
                    call(
                        1, 0, spec.Connection.StartOk,
                        # Due Table type, we cannot compare bytestream directly
                        DataComparator(
                            'FsSs',
                            (
                                client_properties, 'AMQPLAIN',
                                security_mechanism,
                                'en_US'
                            )
                        ),
                        None
                    ),
                    call(
                        1, 0, spec.Connection.TuneOk,
                        dumps(
                            'BlB',
                            (conn.channel_max, conn.frame_max, conn.heartbeat)
                        ),
                        None
                    ),
                    call(
                        1, 0, spec.Connection.Open,
                        dumps('ssb', (conn.virtual_host, '', False)),
                        None
                    )
                ]
            )
            assert conn.client_properties == client_properties

    def test_connection_close(self):
        # Test checking closing connection
        frame_writer_cls_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            frame_writer_mock.reset_mock()
            # Inject CloseOk response from broker
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Connection.CloseOk
            )
            t = conn.transport
            conn.close()
            frame_writer_mock.assert_called_once_with(
                1, 0, spec.Connection.Close, dumps('BsBB', (0, '', 0, 0)), None
            )
            t.close.assert_called_once_with()

    @patch('amqp.Connection._on_blocked')
    def test_connecion_ignore_methods_during_close(self, on_blocked_mock):
        # Test checking that py-amqp will discard any received methods
        # except Close and Close-OK after sending Connection.Close method
        # to server.
        frame_writer_cls_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            frame_writer_mock.reset_mock()
            # Inject CloseOk response from broker
            transport_mock().read_frame.side_effect = [
                build_frame_type_1(
                    spec.Connection.Blocked, channel=0
                ),
                build_frame_type_1(
                    spec.Connection.CloseOk
                )
            ]
            t = conn.transport
            conn.close()
            on_blocked_mock.assert_not_called()
            frame_writer_mock.assert_called_once_with(
                1, 0, spec.Connection.Close, dumps('BsBB', (0, '', 0, 0)), None
            )
            t.close.assert_called_once_with()

    def test_connection_closed_by_broker(self):
        # Test that library response correctly CloseOk when
        # close method is received and _on_close_ok() method is called.
        frame_writer_cls_mock = Mock()
        frame_writer_mock = frame_writer_cls_mock()
        with patch.object(Connection, '_on_close_ok') as callback_mock:
            conn = Connection(frame_writer=frame_writer_cls_mock)
            with patch.object(conn, 'Transport') as transport_mock:
                handshake(conn, transport_mock)
                frame_writer_mock.reset_mock()
                # Inject Close response from broker
                transport_mock().read_frame.return_value = build_frame_type_1(
                    spec.Connection.Close,
                    args=(1, False),
                    arg_format='Lb'
                )
                with pytest.raises(ConnectionError):
                    conn.drain_events(0)
                frame_writer_mock.assert_called_once_with(
                    1, 0, spec.Connection.CloseOk, '', None
                )
                callback_mock.assert_called_once_with()

    def test_send_heartbeat(self):
        """The send_heartbeat method writes the expected output."""
        conn = Connection()
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            transport_mock().write.reset_mock()
            conn.send_heartbeat()
            transport_mock().write.assert_called_once_with(
                memoryview(bytearray(b'\x08\x00\x00\x00\x00\x00\x00\xce'))
            )


class test_channel:
    # Integration tests. Tests verify the correctness of communication between
    # library and broker.
    # * tests mocks broker responses mocking return values of
    #   amqp.transport.Transport.read_frame() method
    # * tests asserts expected library responses to broker via calls of
    #   amqp.method_framing.frame_writer() function

    @pytest.mark.parametrize("method, callback", connection_testdata)
    def test_connection_methods(self, method, callback):
        # Test verifying that proper Connection callback is called when
        # given method arrived from Broker.
        with patch.object(Connection, callback) as callback_mock:
            conn = Connection()
            with patch.object(conn, 'Transport') as transport_mock:
                handshake(conn, transport_mock)
                # Inject desired method
                transport_mock().read_frame.return_value = build_frame_type_1(
                    method, channel=0, args=(1, False), arg_format='Lb'
                )
                conn.drain_events(0)
                callback_mock.assert_called_once()

    def test_channel_ignore_methods_during_close(self):
        # Test checking that py-amqp will discard any received methods
        # except Close and Close-OK after sending Channel.Close method
        # to server.
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)

            channel_id = 1
            transport_mock().read_frame.side_effect = [
                # Inject Open Handshake
                build_frame_type_1(
                    spec.Channel.OpenOk,
                    channel=channel_id,
                    args=(1, False),
                    arg_format='Lb'
                ),
                # Inject basic-deliver response
                build_frame_type_1(
                    spec.Basic.Deliver,
                    channel=1,
                    arg_format='sLbss',
                    args=(
                        # consumer-tag, delivery-tag, redelivered,
                        consumer_tag, 1, False,
                        # exchange-name, routing-key
                        'foo_exchange', 'routing-key'
                    )
                ),
                build_frame_type_2(
                    channel=1,
                    body_len=12,
                    properties=b'0\x00\x00\x00\x00\x00\x01'
                ),
                build_frame_type_3(
                    channel=1,
                    body=b'Hello World!'
                ),
                # Inject close method
                build_frame_type_1(
                    spec.Channel.CloseOk,
                    channel=channel_id
                ),
            ]

            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()

            with patch('amqp.Channel._on_basic_deliver') as on_deliver_mock:
                ch = conn.channel(channel_id=channel_id)
                ch.close()
                on_deliver_mock.assert_not_called()
            frame_writer_mock.assert_has_calls(
                [
                    call(
                        1, 1, spec.Channel.Open, dumps('s', ('',)),
                        None
                    ),
                    call(
                        1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
                        None
                    )
                ]
            )
            assert ch.is_open is False

    def test_channel_open_close(self):
        # Test checking opening and closing channel
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)

            channel_id = 1
            transport_mock().read_frame.side_effect = [
                # Inject Open Handshake
                build_frame_type_1(
                    spec.Channel.OpenOk,
                    channel=channel_id,
                    args=(1, False),
                    arg_format='Lb'
                ),
                # Inject close method
                build_frame_type_1(
                    spec.Channel.CloseOk,
                    channel=channel_id
                )
            ]

            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()

            on_open_mock = Mock()
            assert conn._used_channel_ids == array('H')
            ch = conn.channel(channel_id=channel_id, callback=on_open_mock)
            on_open_mock.assert_called_once_with(ch)
            assert ch.is_open is True
            assert conn._used_channel_ids == array('H', (1,))

            ch.close()
            frame_writer_mock.assert_has_calls(
                [
                    call(
                        1, 1, spec.Channel.Open, dumps('s', ('',)),
                        None
                    ),
                    call(
                        1, 1, spec.Channel.Close, dumps('BsBB', (0, '', 0, 0)),
                        None
                    )
                ]
            )
            assert ch.is_open is False
            assert conn._used_channel_ids == array('H')

    def test_received_channel_Close_during_connection_close(self):
        # This test verifies that library handles correctly closing channel
        # during closing of connection:
        # 1. User requests closing connection - client sends Connection.Close
        # 2. Broker requests closing Channel - client receives Channel.Close
        # 3. Broker sends Connection.CloseOk
        # see GitHub issue #218
        conn = Connection()
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            channel_id = 1
            create_channel(channel_id, conn, transport_mock)
            # Replies sent by broker
            transport_mock().read_frame.side_effect = [
                # Inject close methods
                build_frame_type_1(
                    spec.Channel.Close,
                    channel=channel_id,
                    args=(1, False),
                    arg_format='Lb'
                ),
                build_frame_type_1(
                    spec.Connection.CloseOk
                )
            ]
            conn.close()

    @pytest.mark.parametrize("method, callback", channel_testdata)
    def test_channel_methods(self, method, callback):
        # Test verifying that proper Channel callback is called when
        # given method arrived from Broker
        with patch.object(Channel, callback) as callback_mock:
            conn = Connection()
            with patch.object(conn, 'Transport') as transport_mock:
                handshake(conn, transport_mock)
                create_channel(1, conn, transport_mock)

                # Inject desired method
                transport_mock().read_frame.return_value = build_frame_type_1(
                    method,
                    channel=1,
                    args=(1, False),
                    arg_format='Lb'
                )
                conn.drain_events(0)
                callback_mock.assert_called_once()

    def test_basic_publish(self):
        # Test verifying publishing message.
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)

            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            msg = Message('test')
            # we need to mock socket timeout due checks in
            # Channel._basic_publish
            transport_mock().read_frame.side_effect = socket.timeout
            ch.basic_publish(msg)
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Basic.Publish,
                dumps('Bssbb', (0, '', '', False, False)), msg
            )

    def test_consume_no_consumer_tag(self):
        # Test verifying starting consuming without specified consumer_tag
        callback_mock = Mock()
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)

            # Inject ConsumeOk response from Broker
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Basic.ConsumeOk,
                channel=1,
                args=(consumer_tag,),
                arg_format='s'
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.basic_consume('my_queue', callback=callback_mock)
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Basic.Consume,
                dumps(
                    'BssbbbbF',
                    (0, 'my_queue', '', False, False, False, False, None)
                ),
                None
            )
            assert ch.callbacks[consumer_tag] == callback_mock
            assert ret == 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'

    def test_consume_with_consumer_tag(self):
        # Test verifying starting consuming with specified consumer_tag
        callback_mock = Mock()
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)

            # Inject ConcumeOk response from Broker
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Basic.ConsumeOk,
                channel=1,
                args=('my_tag',),
                arg_format='s'
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.basic_consume(
                'my_queue', callback=callback_mock, consumer_tag='my_tag'
            )
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Basic.Consume,
                dumps(
                    'BssbbbbF',
                    (
                        0, 'my_queue', 'my_tag',
                        False, False, False, False, None
                    )
                ),
                None
            )
            assert ch.callbacks['my_tag'] == callback_mock
            assert ret == 'my_tag'

    def test_queue_declare(self):
        # Test verifying declaring queue
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Queue.DeclareOk,
                channel=1,
                arg_format='sll',
                args=('foo', 1, 2)
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.queue_declare('foo')
            assert ret == queue_declare_ok_t(
                queue='foo', message_count=1, consumer_count=2
            )
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Queue.Declare,
                dumps(
                    'BsbbbbbF',
                    (
                        0,
                        # queue, passive, durable, exclusive,
                        'foo', False, False, False,
                        # auto_delete, nowait, arguments
                        True, False, None
                    )
                ),
                None
            )

    @pytest.mark.parametrize(
        "reply_code, reply_text, exception", queue_declare_error_testdata)
    def test_queue_declare_error(self, reply_code, reply_text, exception):
        # Test verifying wrong declaring exchange
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Connection.Close,
                args=(reply_code, reply_text) + spec.Exchange.Declare,
                arg_format='BsBB'
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            with pytest.raises(exception) as excinfo:
                ch.queue_declare('foo')
            assert excinfo.value.code == reply_code
            assert excinfo.value.message == reply_text
            assert excinfo.value.method == 'Exchange.declare'
            assert excinfo.value.method_name == 'Exchange.declare'
            assert excinfo.value.method_sig == spec.Exchange.Declare
            # Client is sending to broker:
            # 1. Exchange Declare
            # 2. Connection.CloseOk as reply to received Connection.Close
            frame_writer_calls = [
                call(
                    1, 1, spec.Queue.Declare,
                    dumps(
                        'BsbbbbbF',
                        (
                            0,
                            # queue, passive, durable, exclusive,
                            'foo', False, False, False,
                            # auto_delete, nowait, arguments
                            True, False, None
                        )
                    ),
                    None
                ),
                call(
                    1, 0, spec.Connection.CloseOk,
                    '',
                    None
                ),
            ]
            frame_writer_mock.assert_has_calls(frame_writer_calls)

    def test_queue_delete(self):
        # Test verifying deleting queue
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Queue.DeleteOk,
                channel=1,
                arg_format='l',
                args=(5,)
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            msg_count = ch.queue_delete('foo')
            assert msg_count == 5
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Queue.Delete,
                dumps(
                    'Bsbbb',
                    # queue, if_unused, if_empty, nowait
                    (0, 'foo', False, False, False)
                ),
                None
            )

    def test_queue_purge(self):
        # Test verifying purging queue
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Queue.PurgeOk,
                channel=1,
                arg_format='l',
                args=(4,)
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            msg_count = ch.queue_purge('foo')
            assert msg_count == 4
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Queue.Purge,
                dumps(
                    'Bsb',
                    # queue, nowait
                    (0, 'foo', False)
                ),
                None
            )

    def test_basic_deliver(self):
        # Test checking delivering single message
        callback_mock = Mock()
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        consumer_tag = 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg'
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)

            # Inject ConsumeOk response from Broker
            transport_mock().read_frame.side_effect = [
                # Inject Consume-ok response
                build_frame_type_1(
                    spec.Basic.ConsumeOk,
                    channel=1,
                    args=(consumer_tag,),
                    arg_format='s'
                ),
                # Inject basic-deliver response
                build_frame_type_1(
                    spec.Basic.Deliver,
                    channel=1,
                    arg_format='sLbss',
                    args=(
                        # consumer-tag, delivery-tag, redelivered,
                        consumer_tag, 1, False,
                        # exchange-name, routing-key
                        'foo_exchange', 'routing-key'
                    )
                ),
                build_frame_type_2(
                    channel=1,
                    body_len=12,
                    properties=b'0\x00\x00\x00\x00\x00\x01'
                ),
                build_frame_type_3(
                    channel=1,
                    body=b'Hello World!'
                ),
            ]
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ch.basic_consume('my_queue', callback=callback_mock)
            conn.drain_events()
            callback_mock.assert_called_once_with(ANY)
            msg = callback_mock.call_args[0][0]
            assert isinstance(msg, Message)
            assert msg.body_size == 12
            assert msg.body == b'Hello World!'
            assert msg.frame_method == spec.Basic.Deliver
            assert msg.delivery_tag == 1
            assert msg.ready is True
            assert msg.delivery_info == {
                'consumer_tag': 'amq.ctag-PCmzXGkhCw_v0Zq7jXyvkg',
                'delivery_tag': 1,
                'redelivered': False,
                'exchange': 'foo_exchange',
                'routing_key': 'routing-key'
            }
            assert msg.properties == {
                'application_headers': {}, 'delivery_mode': 1
            }

    def test_queue_get(self):
        # Test verifying getting message from queue
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.side_effect = [
                build_frame_type_1(
                    spec.Basic.GetOk,
                    channel=1,
                    arg_format='Lbssl',
                    args=(
                        # delivery_tag, redelivered, exchange_name
                        1, False, 'foo_exchange',
                        # routing_key, message_count
                        'routing_key', 1
                    )
                ),
                build_frame_type_2(
                    channel=1,
                    body_len=12,
                    properties=b'0\x00\x00\x00\x00\x00\x01'
                ),
                build_frame_type_3(
                    channel=1,
                    body=b'Hello World!'
                )
            ]
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            msg = ch.basic_get('foo')
            assert msg.body_size == 12
            assert msg.body == b'Hello World!'
            assert msg.frame_method == spec.Basic.GetOk
            assert msg.delivery_tag == 1
            assert msg.ready is True
            assert msg.delivery_info == {
                'delivery_tag': 1, 'redelivered': False,
                'exchange': 'foo_exchange',
                'routing_key': 'routing_key', 'message_count': 1
            }
            assert msg.properties == {
                'application_headers': {}, 'delivery_mode': 1
            }
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Basic.Get,
                dumps(
                    'Bsb',
                    # queue, nowait
                    (0, 'foo', False)
                ),
                None
            )

    def test_queue_get_empty(self):
        # Test verifying getting message from empty queue
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Basic.GetEmpty,
                channel=1,
                arg_format='s',
                args=('s')
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.basic_get('foo')
            assert ret is None
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Basic.Get,
                dumps(
                    'Bsb',
                    # queue, nowait
                    (0, 'foo', False)
                ),
                None
            )

    def test_exchange_declare(self):
        # Test verifying declaring exchange
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Exchange.DeclareOk,
                channel=1
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.exchange_declare('foo', 'fanout')
            assert ret is None
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Exchange.Declare,
                dumps(
                    'BssbbbbbF',
                    (
                        0,
                        # exchange, type, passive, durable,
                        'foo', 'fanout', False, False,
                        # auto_delete, internal, nowait, arguments
                        True, False, False, None
                    )
                ),
                None
            )

    @pytest.mark.parametrize(
        "reply_code, reply_text, exception", exchange_declare_error_testdata)
    def test_exchange_declare_error(self, reply_code, reply_text, exception):
        # Test verifying wrong declaring exchange
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Connection.Close,
                args=(reply_code, reply_text) + spec.Exchange.Declare,
                arg_format='BsBB'
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            with pytest.raises(exception) as excinfo:
                ch.exchange_declare('exchange', 'exchange-type')
            assert excinfo.value.code == reply_code
            assert excinfo.value.message == reply_text
            assert excinfo.value.method == 'Exchange.declare'
            assert excinfo.value.method_name == 'Exchange.declare'
            assert excinfo.value.method_sig == spec.Exchange.Declare
            # Client is sending to broker:
            # 1. Exchange Declare
            # 2. Connection.CloseOk as reply to received Connection.Close
            frame_writer_calls = [
                call(
                    1, 1, spec.Exchange.Declare,
                    dumps(
                        'BssbbbbbF',
                        (
                            0,
                            # exchange, type, passive, durable,
                            'exchange', 'exchange-type', False, False,
                            # auto_delete, internal, nowait, arguments
                            True, False, False, None
                        )
                    ),
                    None
                ),
                call(
                    1, 0, spec.Connection.CloseOk,
                    '',
                    None
                ),
            ]
            frame_writer_mock.assert_has_calls(frame_writer_calls)

    def test_exchange_delete(self):
        # Test verifying declaring exchange
        frame_writer_cls_mock = Mock()
        conn = Connection(frame_writer=frame_writer_cls_mock)
        with patch.object(conn, 'Transport') as transport_mock:
            handshake(conn, transport_mock)
            ch = create_channel(1, conn, transport_mock)
            transport_mock().read_frame.return_value = build_frame_type_1(
                spec.Exchange.DeleteOk,
                channel=1
            )
            frame_writer_mock = frame_writer_cls_mock()
            frame_writer_mock.reset_mock()
            ret = ch.exchange_delete('foo')
            assert ret == ()
            frame_writer_mock.assert_called_once_with(
                1, 1, spec.Exchange.Delete,
                dumps(
                    'Bsbb',
                    (
                        0,
                        # exchange, if-unused, no-wait
                        'foo', False, False
                    )
                ),
                None
            )
