import pickle
from datetime import datetime
from decimal import Decimal
from math import ceil
from struct import pack

import pytest

from amqp.basic_message import Message
from amqp.exceptions import FrameSyntaxError
from amqp.serialization import GenericContent, _read_item, dumps, loads


class _ANY:

    def __eq__(self, other):
        return other is not None

    def __ne__(self, other):
        return other is None


class test_serialization:

    @pytest.mark.parametrize('descr,frame,expected,cast', [
        ('S', b's8thequick', 'thequick', None),
        ('S', b'S\x00\x00\x00\x03\xc0\xc0\x00', b'\xc0\xc0\x00', None),
        ('x', b'x\x00\x00\x00\x09thequick\xffIGNORED', b'thequick\xff', None),
        ('b', b'b' + pack('>B', True), True, None),
        ('B', b'B' + pack('>b', 123), 123, None),
        ('U', b'U' + pack('>h', -321), -321, None),
        ('u', b'u' + pack('>H', 321), 321, None),
        ('i', b'i' + pack('>I', 1234), 1234, None),
        ('L', b'L' + pack('>q', -32451), -32451, None),
        ('l', b'l' + pack('>Q', 32451), 32451, None),
        ('f', b'f' + pack('>f', 33.3), 34.0, ceil),
    ])
    def test_read_item(self, descr, frame, expected, cast):
        actual = _read_item(frame, 0)[0]
        actual = cast(actual) if cast else actual
        assert actual == expected

    def test_read_item_V(self):
        assert _read_item(b'V', 0)[0] is None

    def test_roundtrip(self):
        format = b'bobBlLbsbSTx'
        x = dumps(format, [
            True, 32, False, 3415, 4513134, 13241923419,
            True, b'thequickbrownfox', False, 'jumpsoverthelazydog',
            datetime(2015, 3, 13, 10, 23),
            b'thequick\xff'
        ])
        y = loads(format, x, 0)
        assert [
            True, 32, False, 3415, 4513134, 13241923419,
            True, 'thequickbrownfox', False, 'jumpsoverthelazydog',
            datetime(2015, 3, 13, 10, 23), b'thequick\xff'
        ] == y[0]

    def test_int_boundaries(self):
        format = b'F'
        x = dumps(format, [
            {'a': -2147483649, 'b': 2147483648},  # celery/celery#3121
        ])
        y = loads(format, x, 0)
        assert y[0] == [{
            'a': -2147483649, 'b': 2147483648,  # celery/celery#3121
        }]

    def test_loads_unknown_type(self):
        with pytest.raises(FrameSyntaxError):
            loads('y', 'asdsad', 0)

    def test_float(self):
        data = int(loads(b'fb', dumps(b'fb', [32.31, False]), 0)[0][0] * 100)
        assert(data == 3231)

    def test_table(self):
        table = {
            'foo': 32,
            'bar': 'baz',
            'nil': None,
            'array': [
                1, True, 'bar'
            ]
        }
        assert loads(b'F', dumps(b'F', [table]), 0)[0][0] == table

    def test_table__unknown_type(self):
        table = {
            'foo': object(),
            'bar': 'baz',
            'nil': None,
            'array': [
                1, True, 'bar'
            ]
        }
        with pytest.raises(FrameSyntaxError):
            dumps(b'F', [table])

    def test_array(self):
        array = [
            'A', 1, True, 33.3,
            Decimal('55.5'), Decimal('-3.4'),
            datetime(2015, 3, 13, 10, 23),
            {'quick': 'fox', 'amount': 1},
            [3, 'hens'],
            None,
        ]
        expected = list(array)
        expected[6] = _ANY()

        assert expected == loads('A', dumps('A', [array]), 0)[0][0]

    def test_array_unknown_type(self):
        with pytest.raises(FrameSyntaxError):
            dumps('A', [[object()]])

    def test_bit_offset_adjusted_correctly(self):
        expected = [50, "quick", "fox", True,
                    False, False, True, True, {"prop1": True}]
        buf = dumps('BssbbbbbF', expected)
        actual, _ = loads('BssbbbbbF', buf, 0)
        assert actual == expected

    def test_sixteen_bitflags(self):
        expected = [True, False] * 8
        format = 'b' * len(expected)
        buf = dumps(format, expected)
        actual, _ = loads(format, buf, 0)
        assert actual == expected


class test_GenericContent:

    @pytest.fixture(autouse=True)
    def setup_content(self):
        self.g = GenericContent()

    def test_getattr(self):
        self.g.properties['foo'] = 30
        assert self.g.foo == 30
        with pytest.raises(AttributeError):
            self.g.bar

    def test_pickle(self):
        pickle.loads(pickle.dumps(self.g))

    def test_load_properties(self):
        m = Message()
        m.properties = {
            'content_type': 'application/json',
            'content_encoding': 'utf-8',
            'application_headers': {
                'foo': 1,
                'id': 'id#1',
            },
            'delivery_mode': 1,
            'priority': 255,
            'correlation_id': 'df31-142f-34fd-g42d',
            'reply_to': 'cosmo',
            'expiration': '2015-12-23',
            'message_id': '3312',
            'timestamp': 3912491234,
            'type': 'generic',
            'user_id': 'george',
            'app_id': 'vandelay',
            'cluster_id': 'NYC',
        }
        s = m._serialize_properties()
        m2 = Message()
        m2._load_properties(m2.CLASS_ID, s, 0)
        assert m2.properties == m.properties

    def test_load_properties__some_missing(self):
        m = Message()
        m.properties = {
            'content_type': 'application/json',
            'content_encoding': 'utf-8',
            'delivery_mode': 1,
            'correlation_id': 'df31-142f-34fd-g42d',
            'reply_to': 'cosmo',
            'expiration': '2015-12-23',
            'message_id': '3312',
            'type': None,
            'app_id': None,
            'cluster_id': None,
        }
        s = m._serialize_properties()
        m2 = Message()
        m2._load_properties(m2.CLASS_ID, s, 0)

    def test_inbound_header(self):
        m = Message()
        m.properties = {
            'content_type': 'application/json',
            'content_encoding': 'utf-8',
        }
        body = 'the quick brown fox'
        buf = b'\0' * 30 + pack('>HxxQ', m.CLASS_ID, len(body))
        buf += m._serialize_properties()
        assert m.inbound_header(buf, offset=30) == 42
        assert m.body_size == len(body)
        assert m.properties['content_type'] == 'application/json'
        assert not m.ready

    def test_inbound_header__empty_body(self):
        m = Message()
        m.properties = {}
        buf = pack('>HxxQ', m.CLASS_ID, 0)
        buf += m._serialize_properties()
        assert m.inbound_header(buf, offset=0) == 12
        assert m.ready

    def test_inbound_body(self):
        m = Message()
        m.body_size = 16
        m.body_received = 8
        m._pending_chunks = [b'the', b'quick']
        m.inbound_body(b'brown')
        assert not m.ready
        m.inbound_body(b'fox')
        assert m.ready
        assert m.body == b'thequickbrownfox'

    def test_inbound_body__no_chunks(self):
        m = Message()
        m.body_size = 16
        m.inbound_body('thequickbrownfox')
        assert m.ready
