import logging
import os
import time
from mock import patch
import pytest
import kafka.codec

import pytest
from six.moves import xrange
import six

from . import unittest
from kafka import (
    KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
    create_gzip_message, KafkaProducer
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
    ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
    KafkaTimeoutError, UnsupportedCodecError
)
from kafka.structs import (
    ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
)

from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
    KafkaIntegrationTestCase, kafka_versions, random_string, Timer,
    send_messages
)


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
    """Test KafkaConsumer
    """
    kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')

    send_messages(simple_client, topic, 0, range(0, 100))
    send_messages(simple_client, topic, 1, range(100, 200))

    cnt = 0
    messages = {0: set(), 1: set()}
    for message in kafka_consumer:
        logging.debug("Consumed message %s", repr(message))
        cnt += 1
        messages[message.partition].add(message.offset)
        if cnt >= 200:
            break

    assert len(messages[0]) == 100
    assert len(messages[1]) == 100
    kafka_consumer.close()


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer_unsupported_encoding(
        topic, kafka_producer_factory, kafka_consumer_factory):
    # Send a compressed message
    producer = kafka_producer_factory(compression_type="gzip")
    fut = producer.send(topic, b"simple message" * 200)
    fut.get(timeout=5)
    producer.close()

    # Consume, but with the related compression codec not available
    with patch.object(kafka.codec, "has_gzip") as mocked:
        mocked.return_value = False
        consumer = kafka_consumer_factory(auto_offset_reset='earliest')
        error_msg = "Libraries for gzip compression codec not found"
        with pytest.raises(UnsupportedCodecError, match=error_msg):
            consumer.poll(timeout_ms=2000)


class TestConsumerIntegration(KafkaIntegrationTestCase):
    maxDiff = None

    @classmethod
    def setUpClass(cls):
        if not os.environ.get('KAFKA_VERSION'):
            return

        cls.zk = ZookeeperFixture.instance()
        chroot = random_string(10)
        cls.server1 = KafkaFixture.instance(0, cls.zk,
                                            zk_chroot=chroot)
        cls.server2 = KafkaFixture.instance(1, cls.zk,
                                            zk_chroot=chroot)

        cls.server = cls.server1 # Bootstrapping server

    @classmethod
    def tearDownClass(cls):
        if not os.environ.get('KAFKA_VERSION'):
            return

        cls.server1.close()
        cls.server2.close()
        cls.zk.close()

    def send_messages(self, partition, messages):
        messages = [ create_message(self.msg(str(msg))) for msg in messages ]
        produce = ProduceRequestPayload(self.topic, partition, messages = messages)
        resp, = self.client.send_produce_request([produce])
        self.assertEqual(resp.error, 0)

        return [ x.value for x in messages ]

    def send_gzip_message(self, partition, messages):
        message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
        produce = ProduceRequestPayload(self.topic, partition, messages = [message])
        resp, = self.client.send_produce_request([produce])
        self.assertEqual(resp.error, 0)

    def assert_message_count(self, messages, num_messages):
        # Make sure we got them all
        self.assertEqual(len(messages), num_messages)

        # Make sure there are no duplicates
        self.assertEqual(len(set(messages)), num_messages)

    def consumer(self, **kwargs):
        if os.environ['KAFKA_VERSION'] == "0.8.0":
            # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
            kwargs['group'] = None
            kwargs['auto_commit'] = False
        else:
            kwargs.setdefault('group', None)
            kwargs.setdefault('auto_commit', False)

        consumer_class = kwargs.pop('consumer', SimpleConsumer)
        group = kwargs.pop('group', None)
        topic = kwargs.pop('topic', self.topic)

        if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
            kwargs.setdefault('iter_timeout', 0)

        return consumer_class(self.client, group, topic, **kwargs)

    def kafka_consumer(self, **configs):
        brokers = '%s:%d' % (self.server.host, self.server.port)
        consumer = KafkaConsumer(self.topic,
                                 bootstrap_servers=brokers,
                                 **configs)
        return consumer

    def kafka_producer(self, **configs):
        brokers = '%s:%d' % (self.server.host, self.server.port)
        producer = KafkaProducer(
            bootstrap_servers=brokers, **configs)
        return producer

    def test_simple_consumer(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Start a consumer
        consumer = self.consumer()

        self.assert_message_count([ message for message in consumer ], 200)

        consumer.stop()

    def test_simple_consumer_gzip(self):
        self.send_gzip_message(0, range(0, 100))
        self.send_gzip_message(1, range(100, 200))

        # Start a consumer
        consumer = self.consumer()

        self.assert_message_count([ message for message in consumer ], 200)

        consumer.stop()

    def test_simple_consumer_smallest_offset_reset(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        consumer = self.consumer(auto_offset_reset='smallest')
        # Move fetch offset ahead of 300 message (out of range)
        consumer.seek(300, 2)
        # Since auto_offset_reset is set to smallest we should read all 200
        # messages from beginning.
        self.assert_message_count([message for message in consumer], 200)

    def test_simple_consumer_largest_offset_reset(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Default largest
        consumer = self.consumer()
        # Move fetch offset ahead of 300 message (out of range)
        consumer.seek(300, 2)
        # Since auto_offset_reset is set to largest we should not read any
        # messages.
        self.assert_message_count([message for message in consumer], 0)
        # Send 200 new messages to the queue
        self.send_messages(0, range(200, 300))
        self.send_messages(1, range(300, 400))
        # Since the offset is set to largest we should read all the new messages.
        self.assert_message_count([message for message in consumer], 200)

    def test_simple_consumer_no_reset(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Default largest
        consumer = self.consumer(auto_offset_reset=None)
        # Move fetch offset ahead of 300 message (out of range)
        consumer.seek(300, 2)
        with self.assertRaises(OffsetOutOfRangeError):
            consumer.get_message()

    @kafka_versions('>=0.8.1')
    def test_simple_consumer_load_initial_offsets(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Create 1st consumer and change offsets
        consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
        self.assertEqual(consumer.offsets, {0: 0, 1: 0})
        consumer.offsets.update({0:51, 1:101})
        # Update counter after manual offsets update
        consumer.count_since_commit += 1
        consumer.commit()

        # Create 2nd consumer and check initial offsets
        consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
                                 auto_commit=False)
        self.assertEqual(consumer.offsets, {0: 51, 1: 101})

    def test_simple_consumer__seek(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        consumer = self.consumer()

        # Rewind 10 messages from the end
        consumer.seek(-10, 2)
        self.assert_message_count([ message for message in consumer ], 10)

        # Rewind 13 messages from the end
        consumer.seek(-13, 2)
        self.assert_message_count([ message for message in consumer ], 13)

        # Set absolute offset
        consumer.seek(100)
        self.assert_message_count([ message for message in consumer ], 0)
        consumer.seek(100, partition=0)
        self.assert_message_count([ message for message in consumer ], 0)
        consumer.seek(101, partition=1)
        self.assert_message_count([ message for message in consumer ], 0)
        consumer.seek(90, partition=0)
        self.assert_message_count([ message for message in consumer ], 10)
        consumer.seek(20, partition=1)
        self.assert_message_count([ message for message in consumer ], 80)
        consumer.seek(0, partition=1)
        self.assert_message_count([ message for message in consumer ], 100)

        consumer.stop()

    def test_simple_consumer_blocking(self):
        consumer = self.consumer()

        # Ask for 5 messages, nothing in queue, block 1 second
        with Timer() as t:
            messages = consumer.get_messages(block=True, timeout=1)
            self.assert_message_count(messages, 0)
        self.assertGreaterEqual(t.interval, 1)

        self.send_messages(0, range(0, 5))
        self.send_messages(1, range(5, 10))

        # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
        with Timer() as t:
            messages = consumer.get_messages(count=5, block=True, timeout=3)
            self.assert_message_count(messages, 5)
        self.assertLess(t.interval, 3)

        # Ask for 10 messages, get 5 back, block 1 second
        with Timer() as t:
            messages = consumer.get_messages(count=10, block=True, timeout=1)
            self.assert_message_count(messages, 5)
        self.assertGreaterEqual(t.interval, 1)

        # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
        # second, get 5 back, no blocking
        self.send_messages(0, range(0, 3))
        self.send_messages(1, range(3, 5))
        with Timer() as t:
            messages = consumer.get_messages(count=10, block=1, timeout=1)
            self.assert_message_count(messages, 5)
        self.assertLessEqual(t.interval, 1)

        consumer.stop()

    def test_simple_consumer_pending(self):
        # make sure that we start with no pending messages
        consumer = self.consumer()
        self.assertEquals(consumer.pending(), 0)
        self.assertEquals(consumer.pending(partitions=[0]), 0)
        self.assertEquals(consumer.pending(partitions=[1]), 0)

        # Produce 10 messages to partitions 0 and 1
        self.send_messages(0, range(0, 10))
        self.send_messages(1, range(10, 20))

        consumer = self.consumer()

        self.assertEqual(consumer.pending(), 20)
        self.assertEqual(consumer.pending(partitions=[0]), 10)
        self.assertEqual(consumer.pending(partitions=[1]), 10)

        # move to last message, so one partition should have 1 pending
        # message and other 0
        consumer.seek(-1, 2)
        self.assertEqual(consumer.pending(), 1)

        pending_part1 = consumer.pending(partitions=[0])
        pending_part2 = consumer.pending(partitions=[1])
        self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
        consumer.stop()

    @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
    def test_multi_process_consumer(self):
        # Produce 100 messages to partitions 0 and 1
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        consumer = self.consumer(consumer = MultiProcessConsumer)

        self.assert_message_count([ message for message in consumer ], 200)

        consumer.stop()

    @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
    def test_multi_process_consumer_blocking(self):
        consumer = self.consumer(consumer = MultiProcessConsumer)

        # Ask for 5 messages, No messages in queue, block 1 second
        with Timer() as t:
            messages = consumer.get_messages(block=True, timeout=1)
            self.assert_message_count(messages, 0)

        self.assertGreaterEqual(t.interval, 1)

        # Send 10 messages
        self.send_messages(0, range(0, 10))

        # Ask for 5 messages, 10 messages in queue, block 0 seconds
        with Timer() as t:
            messages = consumer.get_messages(count=5, block=True, timeout=5)
            self.assert_message_count(messages, 5)
        self.assertLessEqual(t.interval, 1)

        # Ask for 10 messages, 5 in queue, block 1 second
        with Timer() as t:
            messages = consumer.get_messages(count=10, block=True, timeout=1)
            self.assert_message_count(messages, 5)
        self.assertGreaterEqual(t.interval, 1)

        # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
        # second, get at least one back, no blocking
        self.send_messages(0, range(0, 5))
        with Timer() as t:
            messages = consumer.get_messages(count=10, block=1, timeout=1)
            received_message_count = len(messages)
            self.assertGreaterEqual(received_message_count, 1)
            self.assert_message_count(messages, received_message_count)
        self.assertLessEqual(t.interval, 1)

        consumer.stop()

    @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
    def test_multi_proc_pending(self):
        self.send_messages(0, range(0, 10))
        self.send_messages(1, range(10, 20))

        # set group to None and auto_commit to False to avoid interactions w/
        # offset commit/fetch apis
        consumer = MultiProcessConsumer(self.client, None, self.topic,
                                        auto_commit=False, iter_timeout=0)

        self.assertEqual(consumer.pending(), 20)
        self.assertEqual(consumer.pending(partitions=[0]), 10)
        self.assertEqual(consumer.pending(partitions=[1]), 10)

        consumer.stop()

    @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
    @kafka_versions('>=0.8.1')
    def test_multi_process_consumer_load_initial_offsets(self):
        self.send_messages(0, range(0, 10))
        self.send_messages(1, range(10, 20))

        # Create 1st consumer and change offsets
        consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
        self.assertEqual(consumer.offsets, {0: 0, 1: 0})
        consumer.offsets.update({0:5, 1:15})
        # Update counter after manual offsets update
        consumer.count_since_commit += 1
        consumer.commit()

        # Create 2nd consumer and check initial offsets
        consumer = self.consumer(consumer = MultiProcessConsumer,
                                 group='test_multi_process_consumer_load_initial_offsets',
                                 auto_commit=False)
        self.assertEqual(consumer.offsets, {0: 5, 1: 15})

    def test_large_messages(self):
        # Produce 10 "normal" size messages
        small_messages = self.send_messages(0, [ str(x) for x in range(10) ])

        # Produce 10 messages that are large (bigger than default fetch size)
        large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])

        # Brokers prior to 0.11 will return the next message
        # if it is smaller than max_bytes (called buffer_size in SimpleConsumer)
        # Brokers 0.11 and later that store messages in v2 format
        # internally will return the next message only if the
        # full MessageSet is smaller than max_bytes.
        # For that reason, we set the max buffer size to a little more
        # than the size of all large messages combined
        consumer = self.consumer(max_buffer_size=60000)

        expected_messages = set(small_messages + large_messages)
        actual_messages = set([ x.message.value for x in consumer ])
        self.assertEqual(expected_messages, actual_messages)

        consumer.stop()

    def test_huge_messages(self):
        huge_message, = self.send_messages(0, [
            create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
        ])

        # Create a consumer with the default buffer size
        consumer = self.consumer()

        # This consumer fails to get the message
        with self.assertRaises(ConsumerFetchSizeTooSmall):
            consumer.get_message(False, 0.1)

        consumer.stop()

        # Create a consumer with no fetch size limit
        big_consumer = self.consumer(
            max_buffer_size = None,
            partitions = [0],
        )

        # Seek to the last message
        big_consumer.seek(-1, 2)

        # Consume giant message successfully
        message = big_consumer.get_message(block=False, timeout=10)
        self.assertIsNotNone(message)
        self.assertEqual(message.message.value, huge_message)

        big_consumer.stop()

    @kafka_versions('>=0.8.1')
    def test_offset_behavior__resuming_behavior(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Start a consumer
        consumer1 = self.consumer(
            group='test_offset_behavior__resuming_behavior',
            auto_commit=True,
            auto_commit_every_t = None,
            auto_commit_every_n = 20,
        )

        # Grab the first 195 messages
        output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
        self.assert_message_count(output_msgs1, 195)

        # The total offset across both partitions should be at 180
        consumer2 = self.consumer(
            group='test_offset_behavior__resuming_behavior',
            auto_commit=True,
            auto_commit_every_t = None,
            auto_commit_every_n = 20,
        )

        # 181-200
        self.assert_message_count([ message for message in consumer2 ], 20)

        consumer1.stop()
        consumer2.stop()

    @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
    @kafka_versions('>=0.8.1')
    def test_multi_process_offset_behavior__resuming_behavior(self):
        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Start a consumer
        consumer1 = self.consumer(
            consumer=MultiProcessConsumer,
            group='test_multi_process_offset_behavior__resuming_behavior',
            auto_commit=True,
            auto_commit_every_t = None,
            auto_commit_every_n = 20,
            )

        # Grab the first 195 messages
        output_msgs1 = []
        idx = 0
        for message in consumer1:
            output_msgs1.append(message.message.value)
            idx += 1
            if idx >= 195:
                break
        self.assert_message_count(output_msgs1, 195)

        # The total offset across both partitions should be at 180
        consumer2 = self.consumer(
            consumer=MultiProcessConsumer,
            group='test_multi_process_offset_behavior__resuming_behavior',
            auto_commit=True,
            auto_commit_every_t = None,
            auto_commit_every_n = 20,
            )

        # 181-200
        self.assert_message_count([ message for message in consumer2 ], 20)

        consumer1.stop()
        consumer2.stop()

    # TODO: Make this a unit test -- should not require integration
    def test_fetch_buffer_size(self):

        # Test parameters (see issue 135 / PR 136)
        TEST_MESSAGE_SIZE=1048
        INIT_BUFFER_SIZE=1024
        MAX_BUFFER_SIZE=2048
        assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
        assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
        assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE

        self.send_messages(0, [ "x" * 1048 ])
        self.send_messages(1, [ "x" * 1048 ])

        consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
        messages = [ message for message in consumer ]
        self.assertEqual(len(messages), 2)

    def test_kafka_consumer__blocking(self):
        TIMEOUT_MS = 500
        consumer = self.kafka_consumer(auto_offset_reset='earliest',
                                       enable_auto_commit=False,
                                       consumer_timeout_ms=TIMEOUT_MS)

        # Manual assignment avoids overhead of consumer group mgmt
        consumer.unsubscribe()
        consumer.assign([TopicPartition(self.topic, 0)])

        # Ask for 5 messages, nothing in queue, block 500ms
        with Timer() as t:
            with self.assertRaises(StopIteration):
                msg = next(consumer)
        self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )

        self.send_messages(0, range(0, 10))

        # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
        messages = set()
        with Timer() as t:
            for i in range(5):
                msg = next(consumer)
                messages.add((msg.partition, msg.offset))
        self.assertEqual(len(messages), 5)
        self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )

        # Ask for 10 messages, get 5 back, block 500ms
        messages = set()
        with Timer() as t:
            with self.assertRaises(StopIteration):
                for i in range(10):
                    msg = next(consumer)
                    messages.add((msg.partition, msg.offset))
        self.assertEqual(len(messages), 5)
        self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
        consumer.close()

    @kafka_versions('>=0.8.1')
    def test_kafka_consumer__offset_commit_resume(self):
        GROUP_ID = random_string(10)

        self.send_messages(0, range(0, 100))
        self.send_messages(1, range(100, 200))

        # Start a consumer
        consumer1 = self.kafka_consumer(
            group_id=GROUP_ID,
            enable_auto_commit=True,
            auto_commit_interval_ms=100,
            auto_offset_reset='earliest',
        )

        # Grab the first 180 messages
        output_msgs1 = []
        for _ in xrange(180):
            m = next(consumer1)
            output_msgs1.append(m)
        self.assert_message_count(output_msgs1, 180)
        consumer1.close()

        # The total offset across both partitions should be at 180
        consumer2 = self.kafka_consumer(
            group_id=GROUP_ID,
            enable_auto_commit=True,
            auto_commit_interval_ms=100,
            auto_offset_reset='earliest',
        )

        # 181-200
        output_msgs2 = []
        for _ in xrange(20):
            m = next(consumer2)
            output_msgs2.append(m)
        self.assert_message_count(output_msgs2, 20)
        self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
        consumer2.close()

    @kafka_versions('>=0.10.1')
    def test_kafka_consumer_max_bytes_simple(self):
        self.send_messages(0, range(100, 200))
        self.send_messages(1, range(200, 300))

        # Start a consumer
        consumer = self.kafka_consumer(
            auto_offset_reset='earliest', fetch_max_bytes=300)
        seen_partitions = set([])
        for i in range(10):
            poll_res = consumer.poll(timeout_ms=100)
            for partition, msgs in six.iteritems(poll_res):
                for msg in msgs:
                    seen_partitions.add(partition)

        # Check that we fetched at least 1 message from both partitions
        self.assertEqual(
            seen_partitions, set([
                TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
        consumer.close()

    @kafka_versions('>=0.10.1')
    def test_kafka_consumer_max_bytes_one_msg(self):
        # We send to only 1 partition so we don't have parallel requests to 2
        # nodes for data.
        self.send_messages(0, range(100, 200))

        # Start a consumer. FetchResponse_v3 should always include at least 1
        # full msg, so by setting fetch_max_bytes=1 we should get 1 msg at a time
        # But 0.11.0.0 returns 1 MessageSet at a time when the messages are
        # stored in the new v2 format by the broker.
        #
        # DP Note: This is a strange test. The consumer shouldn't care
        # how many messages are included in a FetchResponse, as long as it is
        # non-zero. I would not mind if we deleted this test. It caused
        # a minor headache when testing 0.11.0.0.
        group = 'test-kafka-consumer-max-bytes-one-msg-' + random_string(5)
        consumer = self.kafka_consumer(
            group_id=group,
            auto_offset_reset='earliest',
            consumer_timeout_ms=5000,
            fetch_max_bytes=1)

        fetched_msgs = [next(consumer) for i in range(10)]
        self.assertEqual(len(fetched_msgs), 10)
        consumer.close()

    @kafka_versions('>=0.10.1')
    def test_kafka_consumer_offsets_for_time(self):
        late_time = int(time.time()) * 1000
        middle_time = late_time - 1000
        early_time = late_time - 2000
        tp = TopicPartition(self.topic, 0)

        timeout = 10
        kafka_producer = self.kafka_producer()
        early_msg = kafka_producer.send(
            self.topic, partition=0, value=b"first",
            timestamp_ms=early_time).get(timeout)
        late_msg = kafka_producer.send(
            self.topic, partition=0, value=b"last",
            timestamp_ms=late_time).get(timeout)

        consumer = self.kafka_consumer()
        offsets = consumer.offsets_for_times({tp: early_time})
        self.assertEqual(len(offsets), 1)
        self.assertEqual(offsets[tp].offset, early_msg.offset)
        self.assertEqual(offsets[tp].timestamp, early_time)

        offsets = consumer.offsets_for_times({tp: middle_time})
        self.assertEqual(offsets[tp].offset, late_msg.offset)
        self.assertEqual(offsets[tp].timestamp, late_time)

        offsets = consumer.offsets_for_times({tp: late_time})
        self.assertEqual(offsets[tp].offset, late_msg.offset)
        self.assertEqual(offsets[tp].timestamp, late_time)

        offsets = consumer.offsets_for_times({})
        self.assertEqual(offsets, {})

        # Out of bound timestamps check

        offsets = consumer.offsets_for_times({tp: 0})
        self.assertEqual(offsets[tp].offset, early_msg.offset)
        self.assertEqual(offsets[tp].timestamp, early_time)

        offsets = consumer.offsets_for_times({tp: 9999999999999})
        self.assertEqual(offsets[tp], None)

        # Beginning/End offsets

        offsets = consumer.beginning_offsets([tp])
        self.assertEqual(offsets, {
            tp: early_msg.offset,
        })
        offsets = consumer.end_offsets([tp])
        self.assertEqual(offsets, {
            tp: late_msg.offset + 1
        })
        consumer.close()

    @kafka_versions('>=0.10.1')
    def test_kafka_consumer_offsets_search_many_partitions(self):
        tp0 = TopicPartition(self.topic, 0)
        tp1 = TopicPartition(self.topic, 1)

        kafka_producer = self.kafka_producer()
        send_time = int(time.time() * 1000)
        timeout = 10
        p0msg = kafka_producer.send(
            self.topic, partition=0, value=b"XXX",
            timestamp_ms=send_time).get(timeout)
        p1msg = kafka_producer.send(
            self.topic, partition=1, value=b"XXX",
            timestamp_ms=send_time).get(timeout)

        consumer = self.kafka_consumer()
        offsets = consumer.offsets_for_times({
            tp0: send_time,
            tp1: send_time
        })

        self.assertEqual(offsets, {
            tp0: OffsetAndTimestamp(p0msg.offset, send_time),
            tp1: OffsetAndTimestamp(p1msg.offset, send_time)
        })

        offsets = consumer.beginning_offsets([tp0, tp1])
        self.assertEqual(offsets, {
            tp0: p0msg.offset,
            tp1: p1msg.offset
        })

        offsets = consumer.end_offsets([tp0, tp1])
        self.assertEqual(offsets, {
            tp0: p0msg.offset + 1,
            tp1: p1msg.offset + 1
        })
        consumer.close()

    @kafka_versions('<0.10.1')
    def test_kafka_consumer_offsets_for_time_old(self):
        consumer = self.kafka_consumer()
        tp = TopicPartition(self.topic, 0)

        with self.assertRaises(UnsupportedVersionError):
            consumer.offsets_for_times({tp: int(time.time())})

    @kafka_versions('>=0.10.1')
    def test_kafka_consumer_offsets_for_times_errors(self):
        consumer = self.kafka_consumer(fetch_max_wait_ms=200,
                                       request_timeout_ms=500)
        tp = TopicPartition(self.topic, 0)
        bad_tp = TopicPartition(self.topic, 100)

        with self.assertRaises(ValueError):
            consumer.offsets_for_times({tp: -1})

        with self.assertRaises(KafkaTimeoutError):
            consumer.offsets_for_times({bad_tp: 0})
