# pylint: skip-file
from __future__ import absolute_import

from collections import defaultdict
from random import randint, sample

import pytest

from kafka.structs import TopicPartition
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor, StickyAssignorUserDataV1
from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment, ConsumerProtocolMemberMetadata
import six


@pytest.fixture(autouse=True)
def reset_sticky_assignor():
    yield
    StickyPartitionAssignor.member_assignment = None
    StickyPartitionAssignor.generation = -1


def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lambda=None):
    cluster = mocker.MagicMock()
    cluster.topics.return_value = topics
    if topics_partitions is not None:
        cluster.partitions_for_topic.return_value = topics_partitions
    if topic_partitions_lambda is not None:
        cluster.partitions_for_topic.side_effect = topic_partitions_lambda
    return cluster


def test_assignor_roundrobin(mocker):
    assignor = RoundRobinPartitionAssignor

    member_metadata = {
        'C0': assignor.metadata({'t0', 't1'}),
        'C1': assignor.metadata({'t0', 't1'}),
    }

    cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
    ret = assignor.assign(cluster, member_metadata)
    expected = {
        'C0': ConsumerProtocolMemberAssignment(
            assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
        'C1': ConsumerProtocolMemberAssignment(
            assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
    }
    assert ret == expected
    assert set(ret) == set(expected)
    for member in ret:
        assert ret[member].encode() == expected[member].encode()


def test_assignor_range(mocker):
    assignor = RangePartitionAssignor

    member_metadata = {
        'C0': assignor.metadata({'t0', 't1'}),
        'C1': assignor.metadata({'t0', 't1'}),
    }

    cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
    ret = assignor.assign(cluster, member_metadata)
    expected = {
        'C0': ConsumerProtocolMemberAssignment(
            assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
        'C1': ConsumerProtocolMemberAssignment(
            assignor.version, [('t0', [2]), ('t1', [2])], b'')
    }
    assert ret == expected
    assert set(ret) == set(expected)
    for member in ret:
        assert ret[member].encode() == expected[member].encode()


def test_sticky_assignor1(mocker):
    """
    Given: there are three consumers C0, C1, C2,
        four topics t0, t1, t2, t3, and each topic has 2 partitions,
        resulting in partitions t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1.
        Each consumer is subscribed to all three topics.
    Then: perform fresh assignment
    Expected: the assignment is
    - C0: [t0p0, t1p1, t3p0]
    - C1: [t0p1, t2p0, t3p1]
    - C2: [t1p0, t2p1]
    Then: remove C1 consumer and perform the reassignment
    Expected: the new assignment is
    - C0 [t0p0, t1p1, t2p0, t3p0]
    - C2 [t0p1, t1p0, t2p1, t3p1]
    """
    cluster = create_cluster(mocker, topics={'t0', 't1', 't2', 't3'}, topics_partitions={0, 1})

    subscriptions = {
        'C0': {'t0', 't1', 't2', 't3'},
        'C1': {'t0', 't1', 't2', 't3'},
        'C2': {'t0', 't1', 't2', 't3'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''),
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)

    del subscriptions['C1']
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C0': ConsumerProtocolMemberAssignment(
            StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b''
        ),
        'C2': ConsumerProtocolMemberAssignment(
            StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b''
        ),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_assignor2(mocker):
    """
    Given: there are three consumers C0, C1, C2,
    and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively.
    Therefore, the partitions are t0p0, t1p0, t1p1, t2p0, t2p1, t2p2.
    C0 is subscribed to t0;
    C1 is subscribed to t0, t1;
    and C2 is subscribed to t0, t1, t2.
    Then: perform the assignment
    Expected: the assignment is
    - C0 [t0p0]
    - C1 [t1p0, t1p1]
    - C2 [t2p0, t2p1, t2p2]
    Then: remove C0 and perform the assignment
    Expected: the assignment is
    - C1 [t0p0, t1p0, t1p1]
    - C2 [t2p0, t2p1, t2p2]
    """

    partitions = {'t0': {0}, 't1': {0, 1}, 't2': {0, 1, 2}}
    cluster = create_cluster(mocker, topics={'t0', 't1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])

    subscriptions = {
        'C0': {'t0'},
        'C1': {'t0', 't1'},
        'C2': {'t0', 't1', 't2'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, [])

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C0': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''),
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)

    del subscriptions['C0']
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_one_consumer_no_topic(mocker):
    cluster = create_cluster(mocker, topics={}, topics_partitions={})

    subscriptions = {
        'C': set(),
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_one_consumer_nonexisting_topic(mocker):
    cluster = create_cluster(mocker, topics={}, topics_partitions={})

    subscriptions = {
        'C': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_one_consumer_one_topic(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker):
    cluster = create_cluster(mocker, topics={'t', 'other-t'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_one_consumer_multiple_topics(mocker):
    cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C': {'t1', 't2'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_two_consumers_one_topic_one_partition(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0})

    subscriptions = {
        'C1': {'t'},
        'C2': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_two_consumers_one_topic_two_partitions(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})

    subscriptions = {
        'C1': {'t'},
        'C2': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [1])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker):
    partitions = {'t1': {0, 1, 2}, 't2': {0, 1}}
    cluster = create_cluster(mocker, topics={'t1', 't2'}, topic_partitions_lambda=lambda t: partitions[t])

    subscriptions = {
        'C1': {'t1'},
        'C2': {'t1', 't2'},
        'C3': {'t1'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''),
        'C3': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_add_remove_consumer_one_topic(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C1': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
    }
    assert_assignment(assignment, expected_assignment)

    subscriptions = {
        'C1': {'t'},
        'C2': {'t'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(
            topics, assignment[member].partitions() if member in assignment else []
        )

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    subscriptions = {
        'C2': {'t'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert len(assignment['C2'].assignment[0][1]) == 3


def test_sticky_add_remove_topic_two_consumers(mocker):
    cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C1': {'t1'},
        'C2': {'t1'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)

    subscriptions = {
        'C1': {'t1', 't2'},
        'C2': {'t1', 't2'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)

    subscriptions = {
        'C1': {'t2'},
        'C2': {'t2'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C1': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''),
        'C2': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_sticky_reassignment_after_one_consumer_leaves(mocker):
    partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 20)])
    cluster = create_cluster(
        mocker, topics=set(['t{}'.format(i) for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t]
    )

    subscriptions = {}
    for i in range(1, 20):
        topics = set()
        for j in range(1, i + 1):
            topics.add('t{}'.format(j))
        subscriptions['C{}'.format(i)] = topics

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    del subscriptions['C10']
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_sticky_reassignment_after_one_consumer_added(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions=set(range(20)))

    subscriptions = defaultdict(set)
    for i in range(1, 10):
        subscriptions['C{}'.format(i)] = {'t'}

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    subscriptions['C10'] = {'t'}
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(
            topics, assignment[member].partitions() if member in assignment else []
        )
    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_sticky_same_subscriptions(mocker):
    partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 15)])
    cluster = create_cluster(
        mocker, topics=set(['t{}'.format(i) for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t]
    )

    subscriptions = defaultdict(set)
    for i in range(1, 9):
        for j in range(1, len(six.viewkeys(partitions)) + 1):
            subscriptions['C{}'.format(i)].add('t{}'.format(j))

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    del subscriptions['C5']
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())
    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker):
    n_topics = 40
    n_consumers = 200

    all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)])
    partitions = dict([(t, set(range(1, randint(0, 10) + 1))) for t in all_topics])
    cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])

    subscriptions = defaultdict(set)
    for i in range(1, n_consumers + 1):
        for j in range(0, randint(1, 20)):
            subscriptions['C{}'.format(i)].add('t{}'.format(randint(1, n_topics)))

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())

    for i in range(50):
        member = 'C{}'.format(randint(1, n_consumers))
        if member in subscriptions:
            del subscriptions[member]
            del member_metadata[member]

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_new_subscription(mocker):
    cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4'}, topics_partitions={0})

    subscriptions = defaultdict(set)
    for i in range(3):
        for j in range(i, 3 * i - 2 + 1):
            subscriptions['C{}'.format(i)].add('t{}'.format(j))

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    subscriptions['C0'].add('t1')
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, [])

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_move_existing_assignments(mocker):
    cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4', 't5', 't6'}, topics_partitions={0})

    subscriptions = {
        'C1': {'t1', 't2'},
        'C2': {'t1', 't2', 't3', 't4'},
        'C3': {'t2', 't3', 't4', 't5', 't6'},
    }
    member_assignments = {
        'C1': [TopicPartition('t1', 0)],
        'C2': [TopicPartition('t2', 0), TopicPartition('t3', 0)],
        'C3': [TopicPartition('t4', 0), TopicPartition('t5', 0), TopicPartition('t6', 0)],
    }

    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, member_assignments[member])

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)


def test_stickiness(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})
    subscriptions = {
        'C1': {'t'},
        'C2': {'t'},
        'C3': {'t'},
        'C4': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    partitions_assigned = {}
    for consumer, consumer_assignment in six.iteritems(assignment):
        assert (
            len(consumer_assignment.partitions()) <= 1
        ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
        if len(consumer_assignment.partitions()) == 1:
            partitions_assigned[consumer] = consumer_assignment.partitions()[0]

    # removing the potential group leader
    del subscriptions['C1']
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

    for consumer, consumer_assignment in six.iteritems(assignment):
        assert (
            len(consumer_assignment.partitions()) <= 1
        ), 'Consumer {} is assigned more topic partitions than expected.'.format(consumer)
        assert (
            consumer not in partitions_assigned or partitions_assigned[consumer] in consumer_assignment.partitions()
        ), 'Stickiness was not honored for consumer {}'.format(consumer)


def test_assignment_updated_for_deleted_topic(mocker):
    def topic_partitions(topic):
        if topic == 't1':
            return {0}
        if topic == 't3':
            return set(range(100))

    cluster = create_cluster(mocker, topics={'t1', 't3'}, topic_partitions_lambda=topic_partitions)

    subscriptions = {
        'C': {'t1', 't2', 't3'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2})

    subscriptions = {
        'C': {'t'},
    }
    member_metadata = make_member_metadata(subscriptions)

    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)

    subscriptions = {
        'C': {},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, sticky_assignment[member].partitions())

    cluster = create_cluster(mocker, topics={}, topics_partitions={})
    sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    expected_assignment = {
        'C': ConsumerProtocolMemberAssignment(StickyPartitionAssignor.version, [], b''),
    }
    assert_assignment(sticky_assignment, expected_assignment)


def test_conflicting_previous_assignments(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1})

    subscriptions = {
        'C1': {'t'},
        'C2': {'t'},
    }
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        # assume both C1 and C2 have partition 1 assigned to them in generation 1
        member_metadata[member] = build_metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)


@pytest.mark.parametrize(
    'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)]
)
def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers):
    all_topics = sorted(set(['t{}'.format(i) for i in range(1, n_topics + 1)]))
    partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)])
    cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t])

    subscriptions = defaultdict(set)
    for i in range(n_consumers):
        topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
        subscriptions['C{}'.format(i)].update(topics_sample)

    member_metadata = make_member_metadata(subscriptions)

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)

    subscriptions = defaultdict(set)
    for i in range(n_consumers):
        topics_sample = sample(all_topics, randint(1, len(all_topics) - 1))
        subscriptions['C{}'.format(i)].update(topics_sample)

    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, assignment[member].partitions())

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance(subscriptions, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_assignment_with_multiple_generations1(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

    member_metadata = {
        'C1': build_metadata({'t'}, []),
        'C2': build_metadata({'t'}, []),
        'C3': build_metadata({'t'}, []),
    }

    assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
    assert len(assignment1['C1'].assignment[0][1]) == 2
    assert len(assignment1['C2'].assignment[0][1]) == 2
    assert len(assignment1['C3'].assignment[0][1]) == 2

    member_metadata = {
        'C1': build_metadata({'t'}, assignment1['C1'].partitions()),
        'C2': build_metadata({'t'}, assignment1['C2'].partitions()),
    }

    assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2)
    assert len(assignment2['C1'].assignment[0][1]) == 3
    assert len(assignment2['C2'].assignment[0][1]) == 3
    assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]])
    assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

    member_metadata = {
        'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
        'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
    }

    assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3)
    assert len(assignment3['C2'].assignment[0][1]) == 3
    assert len(assignment3['C3'].assignment[0][1]) == 3
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def test_assignment_with_multiple_generations2(mocker):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

    member_metadata = {
        'C1': build_metadata({'t'}, []),
        'C2': build_metadata({'t'}, []),
        'C3': build_metadata({'t'}, []),
    }

    assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
    assert len(assignment1['C1'].assignment[0][1]) == 2
    assert len(assignment1['C2'].assignment[0][1]) == 2
    assert len(assignment1['C3'].assignment[0][1]) == 2

    member_metadata = {
        'C2': build_metadata({'t'}, assignment1['C2'].partitions(), 1),
    }

    assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C2': {'t'}}, assignment2)
    assert len(assignment2['C2'].assignment[0][1]) == 6
    assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

    member_metadata = {
        'C1': build_metadata({'t'}, assignment1['C1'].partitions(), 1),
        'C2': build_metadata({'t'}, assignment2['C2'].partitions(), 2),
        'C3': build_metadata({'t'}, assignment1['C3'].partitions(), 1),
    }

    assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
    assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1])
    assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1])
    assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1])


@pytest.mark.parametrize('execution_number', range(50))
def test_assignment_with_conflicting_previous_generations(mocker, execution_number):
    cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5})

    member_assignments = {
        'C1': [TopicPartition('t', p) for p in {0, 1, 4}],
        'C2': [TopicPartition('t', p) for p in {0, 2, 3}],
        'C3': [TopicPartition('t', p) for p in {3, 4, 5}],
    }
    member_generations = {
        'C1': 1,
        'C2': 1,
        'C3': 2,
    }
    member_metadata = {}
    for member in six.iterkeys(member_assignments):
        member_metadata[member] = build_metadata({'t'}, member_assignments[member], member_generations[member])

    assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
    verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment)
    assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


def make_member_metadata(subscriptions):
    member_metadata = {}
    for member, topics in six.iteritems(subscriptions):
        member_metadata[member] = build_metadata(topics, [])
    return member_metadata


def build_metadata(topics, member_assignment_partitions, generation=-1):
    partitions_by_topic = defaultdict(list)
    for topic_partition in member_assignment_partitions:
        partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
    data = StickyAssignorUserDataV1(six.viewitems(partitions_by_topic), generation)
    user_data = data.encode()
    return ConsumerProtocolMemberMetadata(StickyPartitionAssignor.version, list(topics), user_data)


def assert_assignment(result_assignment, expected_assignment):
    assert result_assignment == expected_assignment
    assert set(result_assignment) == set(expected_assignment)
    for member in result_assignment:
        assert result_assignment[member].encode() == expected_assignment[member].encode()


def verify_validity_and_balance(subscriptions, assignment):
    """
    Verifies that the given assignment is valid with respect to the given subscriptions
    Validity requirements:
    - each consumer is subscribed to topics of all partitions assigned to it, and
    - each partition is assigned to no more than one consumer
    Balance requirements:
    - the assignment is fully balanced (the numbers of topic partitions assigned to consumers differ by at most one), or
    - there is no topic partition that can be moved from one consumer to another with 2+ fewer topic partitions

    :param subscriptions  topic subscriptions of each consumer
    :param assignment: given assignment for balance check
    """
    assert six.viewkeys(subscriptions) == six.viewkeys(assignment)

    consumers = sorted(six.viewkeys(assignment))
    for i in range(len(consumers)):
        consumer = consumers[i]
        partitions = assignment[consumer].partitions()
        for partition in partitions:
            assert partition.topic in subscriptions[consumer], (
                'Error: Partition {} is assigned to consumer {}, '
                'but it is not subscribed to topic {}\n'
                'Subscriptions: {}\n'
                'Assignments: {}'.format(partition, consumers[i], partition.topic, subscriptions, assignment)
            )
        if i == len(consumers) - 1:
            continue

        for j in range(i + 1, len(consumers)):
            other_consumer = consumers[j]
            other_partitions = assignment[other_consumer].partitions()
            partitions_intersection = set(partitions).intersection(set(other_partitions))
            assert partitions_intersection == set(), (
                'Error: Consumers {} and {} have common partitions '
                'assigned to them: {}\n'
                'Subscriptions: {}\n'
                'Assignments: {}'.format(consumer, other_consumer, partitions_intersection, subscriptions, assignment)
            )

            if abs(len(partitions) - len(other_partitions)) <= 1:
                continue

            assignments_by_topic = group_partitions_by_topic(partitions)
            other_assignments_by_topic = group_partitions_by_topic(other_partitions)
            if len(partitions) > len(other_partitions):
                for topic in six.iterkeys(assignments_by_topic):
                    assert topic not in other_assignments_by_topic, (
                        'Error: Some partitions can be moved from {} ({} partitions) '
                        'to {} ({} partitions) '
                        'to achieve a better balance\n'
                        'Subscriptions: {}\n'
                        'Assignments: {}'.format(consumer, len(partitions), other_consumer, len(other_partitions), subscriptions, assignment)
                    )
            if len(other_partitions) > len(partitions):
                for topic in six.iterkeys(other_assignments_by_topic):
                    assert topic not in assignments_by_topic, (
                        'Error: Some partitions can be moved from {} ({} partitions) '
                        'to {} ({} partitions) '
                        'to achieve a better balance\n'
                        'Subscriptions: {}\n'
                        'Assignments: {}'.format(other_consumer, len(other_partitions), consumer, len(partitions), subscriptions, assignment)
                    )


def group_partitions_by_topic(partitions):
    result = defaultdict(set)
    for p in partitions:
        result[p.topic].add(p.partition)
    return result
