File: test_delete_records.py

package info (click to toggle)
python-confluent-kafka 2.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,660 kB
  • sloc: python: 30,428; ansic: 9,487; sh: 1,477; makefile: 192
file content (137 lines) | stat: -rw-r--r-- 5,811 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
# Copyright 2024 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from confluent_kafka.admin import OffsetSpec, DeletedRecords
from confluent_kafka import TopicPartition


def test_delete_records(kafka_cluster):
    """
    Test delete_records, delete the records upto the specified offset
    in that particular partition of the specified topic.
    """
    admin_client = kafka_cluster.admin()

    # Create a topic with a single partition
    topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
                                                            {
                                                                "num_partitions": 1,
                                                                "replication_factor": 1,
                                                            })

    # Create Producer instance
    p = kafka_cluster.producer()
    p.produce(topic, "Message-1")
    p.produce(topic, "Message-2")
    p.produce(topic, "Message-3")
    p.flush()

    topic_partition = TopicPartition(topic, 0)
    requests = {topic_partition: OffsetSpec.earliest()}

    # Check if the earliest avilable offset for this topic partition is 0
    fs = admin_client.list_offsets(requests)
    result = list(fs.values())[0].result()
    assert (result.offset == 0)

    topic_partition_offset = TopicPartition(topic, 0, 2)

    # Delete the records
    fs1 = admin_client.delete_records([topic_partition_offset])

    # Find the earliest available offset for that specific topic partition after deletion has been done
    fs2 = admin_client.list_offsets(requests)

    # Check if the earliest available offset is equal to the offset passed to the delete records function
    res = list(fs1.values())[0].result()
    assert isinstance(res, DeletedRecords)
    assert (res.low_watermark == list(fs2.values())[0].result().offset)

    # Delete created topic
    fs = admin_client.delete_topics([topic])
    for topic, f in fs.items():
        f.result()


def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
    """
    Test delete_records, delete the records upto the specified offset
    in that particular partition of the specified topic.
    """
    admin_client = kafka_cluster.admin()
    num_partitions = 3
    # Create two topics with a single partition
    topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
                                                            {
                                                                "num_partitions": num_partitions,
                                                                "replication_factor": 1,
                                                            })
    topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2",
                                                             {
                                                                 "num_partitions": num_partitions,
                                                                 "replication_factor": 1,
                                                             })

    topics = [topic, topic2]
    partitions = list(range(num_partitions))
    # Create Producer instance
    p = kafka_cluster.producer()
    for t in topics:
        for partition in partitions:
            p.produce(t, "Message-1", partition=partition)
            p.produce(t, "Message-2", partition=partition)
            p.produce(t, "Message-3", partition=partition)
    p.flush()
    requests = dict(
        [
            (TopicPartition(t, partition), OffsetSpec.earliest())
            for t in topics
            for partition in partitions
        ]
    )
    # Check if the earliest available offset for this topic partition is 0
    fs = admin_client.list_offsets(requests)
    assert all([p.result().offset == 0 for p in fs.values()])
    delete_index = 0
    # Delete the records
    for delete_partitions in [
        # Single partition no deletion
        [TopicPartition(topic, 0, 0)],
        # Single topic, two partitions, single record deleted
        [TopicPartition(topic, 0, 1), TopicPartition(topic, 1, 1)],
        # Two topics, four partitions, two records deleted
        [TopicPartition(topic, 2, 2), TopicPartition(topic2, 0, 2),
         TopicPartition(topic2, 1, 2), TopicPartition(topic2, 2, 2)],
    ]:
        list_offsets_requests = dict([
            (part, OffsetSpec.earliest()) for part in delete_partitions
        ])
        futmap_delete = admin_client.delete_records(delete_partitions)
        delete_results = [(part, fut.result())
                          for part, fut in futmap_delete.items()]
        futmap_list = admin_client.list_offsets(list_offsets_requests)
        list_results = dict([(part, fut.result())
                            for part, fut in futmap_list.items()])
        for part, delete_result in delete_results:
            list_result = list_results[part]
            assert isinstance(delete_result, DeletedRecords)
            assert delete_result.low_watermark == list_result.offset
            assert delete_result.low_watermark == delete_index
        delete_index += 1

    # Delete created topics
    fs = admin_client.delete_topics(topics)
    for topic, f in fs.items():
        f.result()