#-------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
#--------------------------------------------------------------------------

import logging
import os
import pytest
import asyncio
import sys
import time

import uamqp
from uamqp import address, types, utils, authentication, MessageBodyType
from uamqp.message import DataBody, ValueBody, SequenceBody


def get_logger(level):
    uamqp_logger = logging.getLogger("uamqp")
    if not uamqp_logger.handlers:
        handler = logging.StreamHandler(stream=sys.stdout)
        handler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s'))
        uamqp_logger.addHandler(handler)
    uamqp_logger.setLevel(level)
    return uamqp_logger


log = get_logger(logging.INFO)


def on_message_received(message):
    annotations = message.annotations
    log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
    return message


def send_single_message(live_eventhub_config, partition, msg_content):
    target = "amqps://{}/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        partition
    )
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAuth.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    uamqp.send_message(target, msg_content, auth=sas_auth, debug=False)


def send_multiple_message(live_eventhub_config, msg_count):
    def data_generator():
        for i in range(msg_count):
            msg_content = "Hello world {}".format(i).encode('utf-8')
            yield msg_content

    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAuth.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

    target = "amqps://{}/{}/Partitions/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'], live_eventhub_config['partition'])
    send_client = uamqp.SendClient(target, auth=sas_auth, debug=False)
    message_batch = uamqp.message.BatchMessage(data_generator())
    send_client.queue_message(message_batch)
    results = send_client.send_all_messages(close_on_done=False)
    assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]
    send_client.close()


@pytest.mark.asyncio
async def test_event_hubs_callback_async_receive(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=1000, prefetch=10)
    log.info("Created client, receiving...")
    await receive_client.receive_messages_async(on_message_received)
    log.info("Finished receiving")


@pytest.mark.asyncio
async def test_event_hubs_callback_async_receive_no_shutdown_after_timeout(live_eventhub_config):

    received_cnt = {'cnt': 0}
    def on_message_received_internal(message):
        annotations = message.annotations
        log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
        log.info(str(message))
        message.accept()
        received_cnt['cnt'] += 1

    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    source = address.Source(source)
    source.set_filter(b"amqp.annotation.x-opt-offset > '@latest'")

    receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=3000, prefetch=10, shutdown_after_timeout=False)
    log.info("Created client, receiving...")
    try:
        await receive_client.open_async()
        while not await receive_client.client_ready_async():
            await asyncio.sleep(0.05)

        await asyncio.sleep(1)  # sleep for 1s
        await receive_client._connection.work_async()  # do a single connection iteration to see if there're incoming transfers

        # make sure no messages are received
        assert not receive_client._was_message_received
        assert receive_client._received_messages.empty()

        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        await receive_client.receive_messages_async(on_message_received_internal)
        message_handler_before = receive_client.message_handler
        assert received_cnt['cnt'] == 1

        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        await receive_client.receive_messages_async(on_message_received_internal)
        message_handler_after = receive_client.message_handler
        assert message_handler_before == message_handler_after
        assert received_cnt['cnt'] == 2

        log.info("Finished receiving")
    finally:
        await receive_client.close_async()


@pytest.mark.asyncio
async def test_event_hubs_filter_receive_async(live_eventhub_config):
    plain_auth = authentication.SASLPlain(
        live_eventhub_config['hostname'],
        live_eventhub_config['key_name'],
        live_eventhub_config['access_key'])
    source_url = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])
    source = address.Source(source_url)
    source.set_filter(b"amqp.annotation.x-opt-enqueuedtimeutc > 1518731960545")

    receive_client = uamqp.ReceiveClientAsync(source, auth=plain_auth, timeout=5000)
    await receive_client.receive_messages_async(on_message_received)


@pytest.mark.asyncio
async def test_event_hubs_iter_receive_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    receive_client = uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10)
    count = 0
    message_generator = receive_client.receive_messages_iter_async()
    async for message in message_generator:
        log.info("No. {} : {}".format(message.annotations.get(b'x-opt-sequence-number'), message))
        count += 1
        if count >= 10:
            log.info("Got {} messages. Breaking.".format(count))
            message.accept()
            break
    count = 0
    async for message in message_generator:
        count += 1
        if count >= 10:
            log.info("Got {} more messages. Shutting down.".format(count))
            message.accept()
            break
    await receive_client.close_async()


@pytest.mark.asyncio
async def test_event_hubs_iter_receive_no_shutdown_after_timeout_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    source = address.Source(source)
    source.set_filter(b"amqp.annotation.x-opt-offset > '@latest'")
    receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=5000, debug=False, shutdown_after_timeout=False)
    count = 0
    try:
        await receive_client.open_async()
        while not await receive_client.client_ready_async():
            await asyncio.sleep(0.05)

        await asyncio.sleep(1)  # sleep for 1s
        await receive_client._connection.work_async()  # do a single connection iteration to see if there're incoming transfers

        # make sure no messages are received
        assert not receive_client._was_message_received
        assert receive_client._received_messages.empty()

        gen = receive_client.receive_messages_iter_async()
        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        async for message in gen:
            log.info(message.annotations.get(b'x-opt-sequence-number'))
            log.info(str(message))
            count += 1

        assert count == 1
        count = 0

        message_handler_before = receive_client.message_handler
        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        gen = receive_client.receive_messages_iter_async()

        async for message in gen:
            log.info(message.annotations.get(b'x-opt-sequence-number'))
            log.info(str(message))
            count += 1

        assert count == 1

        message_handler_after = receive_client.message_handler
        assert message_handler_before == message_handler_after
    finally:
        await receive_client.close_async()

@pytest.mark.asyncio
async def test_event_hubs_batch_receive_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=3000, prefetch=10) as receive_client:
        message_batch = await receive_client.receive_message_batch_async(10)
        log.info("got batch: {}".format(len(message_batch)))
        for message in message_batch:
            annotations = message.annotations
            log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
        next_batch = await receive_client.receive_message_batch_async(10)
        log.info("got another batch: {}".format(len(next_batch)))
        for message in next_batch:
            annotations = message.annotations
            log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
        next_batch = await receive_client.receive_message_batch_async(10)
        log.info("got another batch: {}".format(len(next_batch)))
        for message in next_batch:
            annotations = message.annotations
            log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))


@pytest.mark.asyncio
async def test_event_hubs_batch_receive_async_no_shutdown_after_timeout_sync(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    source = address.Source(source)
    source.set_filter(b"amqp.annotation.x-opt-offset > '@latest'")

    async with uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=3000, prefetch=10, shutdown_after_timeout=False) as receive_client:
        received_cnt = 0

        received_cnt += len(await receive_client.receive_message_batch_async(10))
        assert received_cnt == 0

        message_handler_before = receive_client.message_handler
        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        received_cnt += len(await receive_client.receive_message_batch_async(10))
        assert received_cnt == 1

        received_cnt += len(await receive_client.receive_message_batch_async(10))
        assert received_cnt == 1

        send_single_message(live_eventhub_config, live_eventhub_config['partition'], 'message')
        received_cnt += len(await receive_client.receive_message_batch_async(10))
        message_handler_after = receive_client.message_handler

        assert message_handler_before == message_handler_after
        assert received_cnt == 2


@pytest.mark.asyncio
async def test_event_hubs_client_web_socket_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'],
        transport_type=uamqp.TransportType.AmqpOverWebsocket)

    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    async with uamqp.ReceiveClientAsync(source, auth=sas_auth, debug=False, timeout=5000, prefetch=50) as receive_client:
        receive_client.receive_message_batch(max_batch_size=10)


@pytest.mark.asyncio
async def test_event_hubs_receive_with_runtime_metric_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    receiver_runtime_metric_symbol = b'com.microsoft:enable-receiver-runtime-metric'
    symbol_array = [types.AMQPSymbol(receiver_runtime_metric_symbol)]
    desired_capabilities = utils.data_factory(types.AMQPArray(symbol_array))

    async with uamqp.ReceiveClientAsync(source, debug=False, auth=sas_auth, timeout=1000, prefetch=10,
                                        desired_capabilities=desired_capabilities) as receive_client:
        message_batch = await receive_client.receive_message_batch_async(10)
        log.info("got batch: {}".format(len(message_batch)))
        for message in message_batch:
            annotations = message.annotations
            delivery_annotations = message.delivery_annotations
            log.info("Sequence Number: {}".format(annotations.get(b'x-opt-sequence-number')))
            assert b'last_enqueued_sequence_number' in delivery_annotations
            assert b'last_enqueued_offset' in delivery_annotations
            assert b'last_enqueued_time_utc' in delivery_annotations
            assert b'runtime_info_retrieval_time_utc' in delivery_annotations


@pytest.mark.asyncio
async def test_event_hubs_shared_connection_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'])

    async with uamqp.ConnectionAsync(live_eventhub_config['hostname'], sas_auth, debug=False) as conn:
        partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth, timeout=3000, prefetch=10)
        partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth, timeout=3000, prefetch=10)
        await partition_0.open_async(connection=conn)
        await partition_1.open_async(connection=conn)
        tasks = [
            partition_0.receive_message_batch_async(1),
            partition_1.receive_message_batch_async(1)
        ]
        try:
            messages = await asyncio.gather(*tasks)
            assert len(messages[0]) == 1 and len(messages[1]) == 1
        except:
            raise
        finally:
            await partition_0.close_async()
            await partition_1.close_async()


async def receive_ten(partition, receiver):
    messages = []
    count = 0
    while count < 10:
        print("Receiving {} on partition {}".format(count, partition))
        batch = await receiver.receive_message_batch_async(1)
        print("Received {} messages on partition {}".format(len(batch), partition))
        messages.extend(batch)
        count += 1
    print("Finished receiving on partition {}".format(partition))
    return messages


@pytest.mark.asyncio
async def test_event_hubs_multiple_receiver_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth_a = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    sas_auth_b = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'])

    partition_0 = uamqp.ReceiveClientAsync(source + "0", debug=False, auth=sas_auth_a, timeout=3000, prefetch=10)
    partition_1 = uamqp.ReceiveClientAsync(source + "1", debug=False, auth=sas_auth_b, timeout=3000, prefetch=10)
    try:
        await partition_0.open_async()
        await partition_1.open_async()
        tasks = [
            receive_ten("0", partition_0),
            receive_ten("1", partition_1)
        ]
        messages = await asyncio.gather(*tasks)
        assert len(messages) == 2
        assert len(messages[0]) >= 10
        assert len(messages[1]) >= 10
        print(messages)
    finally:
        await partition_0.close_async()
        await partition_1.close_async()


@pytest.mark.asyncio
async def test_event_hubs_dynamic_issue_link_credit_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    msg_sent_cnt = 200
    send_multiple_message(live_eventhub_config, msg_sent_cnt)

    def message_received_callback(message):
        message_received_callback.received_msg_cnt += 1

    message_received_callback.received_msg_cnt = 0

    async with uamqp.ReceiveClientAsync(source, debug=True, auth=sas_auth, prefetch=1) as receive_client:

        receive_client._message_received_callback = message_received_callback

        while not await receive_client.client_ready_async():
            await asyncio.sleep(0.05)

        await asyncio.sleep(1)  # sleep for 1s
        await receive_client._connection.work_async()  # do a single connection iteration to see if there're incoming transfers

        # make sure no messages are received
        assert not receive_client._was_message_received
        assert receive_client._received_messages.empty()

        await receive_client.message_handler.reset_link_credit_async(msg_sent_cnt)

        now = start = time.time()
        wait_time = 5
        while now - start <= wait_time:
            await receive_client._connection.work_async()
            now = time.time()

        assert message_received_callback.received_msg_cnt == msg_sent_cnt
        log.info("Finished receiving")


@pytest.mark.asyncio
async def test_event_hubs_not_receive_events_during_connection_establishment_async(live_eventhub_config):
    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        live_eventhub_config['partition'])

    receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=1000, debug=False, prefetch=10)
    try:
        await receive_client.open_async()

        while not await receive_client.client_ready_async():
            await asyncio.sleep(0.05)

        await asyncio.sleep(1)  # sleep for 1s
        await receive_client._connection.work_async()  # do a single connection iteration to see if there're incoming transfers

        # make sure no messages are received
        assert not receive_client._was_message_received
        assert receive_client._received_messages.empty()

        messages_0 = await receive_client.receive_message_batch_async()
        assert len(messages_0) > 0
    finally:
        await receive_client.close_async()


@pytest.mark.asyncio
async def event_hubs_send_different_amqp_body_type_async(live_eventhub_config):

    uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

    target = "amqps://{}/{}/Partitions/0".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
    send_client = uamqp.SendClientAsync(target, auth=sas_auth, debug=False)

    data_body_1 = [b'data1', b'data2']
    data_body_message_1 = uamqp.message.Message(body=data_body_1)
    data_body_message_1.application_properties = {'body_type': 'data_body_1'}
    send_client.queue_message(data_body_message_1)

    data_body_2 = b'data1'
    data_body_message_2 = uamqp.message.Message(body=data_body_2, body_type=MessageBodyType.Data)
    data_body_message_2.application_properties = {'body_type': 'data_body_2'}
    send_client.queue_message(data_body_message_2)

    value_body_1 = [b'data1', -1.23, True, {b'key': b'value'}, [1, False, 1.23, b'4']]
    value_body_message_1 = uamqp.message.Message(body=value_body_1)
    value_body_message_1.application_properties = {'body_type': 'value_body_1'}
    send_client.queue_message(value_body_message_1)

    value_body_2 = {b'key1': {b'sub_key': b'sub_value'}, b'key2': b'value', 3: -1.23}
    value_body_message_2 = uamqp.message.Message(body=value_body_2, body_type=MessageBodyType.Value)
    value_body_message_2.application_properties = {'body_type': 'value_body_2'}
    send_client.queue_message(value_body_message_2)

    sequence_body_1 = [b'data1', -1.23, True, {b'key': b'value'}, [b'a', 1.23, True]]
    sequence_body_message_1 = uamqp.message.Message(body=sequence_body_1, body_type=MessageBodyType.Sequence)
    sequence_body_message_1.application_properties = {'body_type': 'sequence_body_1'}
    send_client.queue_message(sequence_body_message_1)

    sequence_body_2 = [[1, 2, 3], [b'aa', b'bb', b'cc'], [True, False, True], [{b'key1': b'value'}, {b'key2': 123}]]
    sequence_body_message_2 = uamqp.message.Message(body=sequence_body_2, body_type=MessageBodyType.Sequence)
    sequence_body_message_2.application_properties = {'body_type': 'sequence_body_2'}
    send_client.queue_message(sequence_body_message_2)

    results = await send_client.send_all_messages_async(close_on_done=False)
    assert not [m for m in results if m == uamqp.constants.MessageState.SendFailed]

    sas_auth = authentication.SASTokenAsync.from_shared_access_key(
        uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])

    source = "amqps://{}/{}/ConsumerGroups/{}/Partitions/{}".format(
        live_eventhub_config['hostname'],
        live_eventhub_config['event_hub'],
        live_eventhub_config['consumer_group'],
        0)

    result_dic = {}
    receive_client = uamqp.ReceiveClientAsync(source, auth=sas_auth, timeout=5000, debug=False, prefetch=10)
    gen = receive_client.receive_messages_iter_async()
    async for message in gen:
        if message.application_properties and message.application_properties.get(b'body_type'):
            if message.application_properties.get(b'body_type') == b'data_body_1':
                check_list = [data for data in message.get_data()]
                assert isinstance(message._body, DataBody)
                assert check_list == data_body_1
                result_dic['data_body_1'] = 1
            elif message.application_properties.get(b'body_type') == b'data_body_2':
                check_list = [data for data in message.get_data()]
                assert isinstance(message._body, DataBody)
                assert check_list == [data_body_2]
                result_dic['data_body_2'] = 1
            elif message.application_properties.get(b'body_type') == b'value_body_1':
                assert message.get_data() == value_body_1
                assert isinstance(message._body, ValueBody)
                result_dic['value_body_1'] = 1
            elif message.application_properties.get(b'body_type') == b'value_body_2':
                assert message.get_data() == value_body_2
                assert isinstance(message._body, ValueBody)
                result_dic['value_body_2'] = 1
            elif message.application_properties.get(b'body_type') == b'sequence_body_1':
                check_list = [data for data in message.get_data()]
                assert check_list == [sequence_body_1]
                assert isinstance(message._body, SequenceBody)
                result_dic['sequence_body_1'] = 1
            elif message.application_properties.get(b'body_type') == b'sequence_body_2':
                check_list = [data for data in message.get_data()]
                assert check_list == sequence_body_2
                assert isinstance(message._body, SequenceBody)
                result_dic['sequence_body_2'] = 1

            log.info(message.annotations.get(b'x-opt-sequence-number'))
            log.info(str(message))


    await send_client.close_async()
    await receive_client.close_async()
    assert len(results) == 6


if __name__ == '__main__':
    config = {}
    config['hostname'] = os.environ['EVENT_HUB_HOSTNAME']
    config['event_hub'] = os.environ['EVENT_HUB_NAME']
    config['key_name'] = os.environ['EVENT_HUB_SAS_POLICY']
    config['access_key'] = os.environ['EVENT_HUB_SAS_KEY']
    config['consumer_group'] = "$Default"
    config['partition'] = "0"

    loop = asyncio.get_event_loop()
    loop.run_until_complete(event_hubs_send_different_amqp_body_type_async(config))
