File: cluster_fixture.py

package info (click to toggle)
python-confluent-kafka 1.7.0-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 1,900 kB
  • sloc: python: 8,335; ansic: 6,065; sh: 1,203; makefile: 178
file content (306 lines) | stat: -rw-r--r-- 9,883 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from uuid import uuid1

from trivup.clusters.KafkaCluster import KafkaCluster

from confluent_kafka import Consumer, Producer, DeserializingConsumer, \
    SerializingProducer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient


class KafkaClusterFixture(object):
    __slots__ = ['_cluster', '_admin', '_producer']

    def __init__(self, conf):
        raise NotImplementedError("KafkaCluster should never be instantiated directly")

    def schema_registry(self, conf=None):
        raise NotImplementedError("schema_registry has not been implemented")

    def client_conf(self, conf=None):
        raise NotImplementedError("client_conf has not been implemented")

    @classmethod
    def _topic_conf(cls, conf=None):
        topic_conf = {'num_partitions': 1,
                      'replication_factor': 1}

        if conf is not None:
            topic_conf.update(conf)

        return topic_conf

    def cimpl_producer(self, conf=None):
        """
        Returns a producer bound to this cluster.

        Args:
            conf (dict): Producer configuration overrides

        Returns:
            Producer: A new Producer instance
        """
        return Producer(self.client_conf(conf))

    def producer(self, conf=None, key_serializer=None, value_serializer=None):
        """
        Returns a producer bound to this cluster.

        Args:
            conf (dict): Producer configuration overrides

            key_serializer (Serializer): serializer to apply to message key

            value_serializer (Deserializer): serializer to apply to
                message value

        Returns:
            Producer: A new SerializingProducer instance

        """
        client_conf = self.client_conf(conf)

        if key_serializer is not None:
            client_conf['key.serializer'] = key_serializer

        if value_serializer is not None:
            client_conf['value.serializer'] = value_serializer

        return SerializingProducer(client_conf)

    def cimpl_consumer(self, conf=None):
        """
        Returns a consumer bound to this cluster.

        Args:
            conf (dict): Consumer config overrides

        Returns:
            Consumer: A new Consumer instance

        """
        consumer_conf = self.client_conf({
            'group.id': str(uuid1()),
            'auto.offset.reset': 'earliest'
        })

        if conf is not None:
            consumer_conf.update(conf)

        return Consumer(consumer_conf)

    def consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
        """
        Returns a consumer bound to this cluster.

        Args:
            conf (dict): Consumer config overrides

            key_deserializer (Deserializer): deserializer to apply to
                message key

            value_deserializer (Deserializer): deserializer to apply to
                message value

        Returns:
            Consumer: A new DeserializingConsumer instance

        """
        consumer_conf = self.client_conf({
            'group.id': str(uuid1()),
            'auto.offset.reset': 'earliest'
        })

        if conf is not None:
            consumer_conf.update(conf)

        if key_deserializer is not None:
            consumer_conf['key.deserializer'] = key_deserializer

        if value_deserializer is not None:
            consumer_conf['value.deserializer'] = value_deserializer

        return DeserializingConsumer(consumer_conf)

    def create_topic(self, prefix, conf=None):
        """
        Creates a new topic with this cluster.

        :param str prefix: topic name
        :param dict conf: additions/overrides to topic configuration.
        :returns: The topic's name
        :rtype: str
        """
        if self._admin is None:
            self._admin = AdminClient(self.client_conf())

        name = prefix + "-" + str(uuid1())
        future_topic = self._admin.create_topics([NewTopic(name,
                                                           **self._topic_conf(conf))])

        future_topic.get(name).result()
        return name

    def seed_topic(self, topic, value_source=None, key_source=None, header_source=None):
        """
        Populates a topic with data asynchronously.

        If there are fewer keys or headers than values the index will be wrapped and reused with
        each subsequent produce request.

        :param str topic: topic to produce test data to.
        :param list value_source: list or other iterable containing message key data to be produced.
        :param list key_source: list or other iterable containing message value data to be produced.
        :param list(dict) header_source: headers to attach to test data.
        """

        if self._producer is None:
            self._producer = self.producer(self.client_conf({'linger.ms': 500}))

        if value_source is None:
            value_source = ['test-data{}'.format(i) for i in range(0, 100)]

        if key_source is None:
            key_source = [None]

        KafkaClusterFixture._produce(self._producer, topic, value_source, key_source, header_source)

    @staticmethod
    def _produce(producer, topic, value_source, key_source, header_source=None):
        """
        Produces a message for each value in value source to a topic.

        value_source drives the produce loop. If corresponding key and header sources
        are specified they will be applied in round robbin order. In the event there
        are less keys or headers than values the index will be wrapped.

        :param producer: producer instance to use
        :param topic:  topic to produce to
        :param value_source: collection of message values
        :param key_source: optional keys to accompany values_source items
        :param header_source: optional headers to accompany value_source items
        :return:
        """
        num_messages = len(value_source)
        num_keys = len(key_source)
        num_headers = len(header_source) if header_source else 0

        print('# producing {} messages to topic {}'.format(num_messages, topic))
        for i in range(0, num_messages):
            while True:
                try:
                    if header_source is not None:
                        producer.produce(topic,
                                         value=value_source[i],
                                         key=key_source[i % num_keys],
                                         headers=header_source[i % (num_headers + 1)])
                    else:
                        producer.produce(topic,
                                         value=value_source[i],
                                         key=key_source[i % num_keys])
                    break
                except BufferError:
                    producer.poll(0.1)
                continue
            producer.poll(0.0)

        producer.flush()

        print('# finished producing {} messages to topic {}'.format(num_messages, topic))


class TrivupFixture(KafkaClusterFixture):
    """
    Starts and stops a Kafka Cluster instances for use in testing.

    Optionally provides a Confluent Schema Registry instance for Avro testing.

    """

    default_options = {
        'broker_cnt': 1,
    }

    def __init__(self, conf):
        self._cluster = KafkaCluster(**conf)
        self._admin = None
        self._producer = None
        self._cluster.wait_operational()

    def schema_registry(self, conf=None):
        if not hasattr(self._cluster, 'sr'):
            return None

        sr_conf = {'url': self._cluster.sr.get('url')}
        if conf is not None:
            sr_conf.update(conf)
        return SchemaRegistryClient(sr_conf)

    def client_conf(self, conf=None):
        """
        Default client configuration

        :param dict conf: default client configuration overrides
        :returns: client configuration
        """
        client_conf = self._cluster.client_conf()

        if conf is not None:
            client_conf.update(conf)

        return client_conf

    def stop(self):
        self._cluster.stop(cleanup=True)


class ByoFixture(KafkaClusterFixture):
    """
    A Kafka cluster fixture that assumes an already running cluster as specified
    by bootstrap.servers, and optionally schema.registry.url, in the conf dict.
    """

    def __init__(self, conf):
        if conf.get("bootstrap.servers", "") == "":
            raise ValueError("'bootstrap.servers' must be set in the "
                             "conf dict")
        self._admin = None
        self._producer = None
        self._conf = conf.copy()
        self._sr_url = self._conf.pop("schema.registry.url", None)

    def schema_registry(self, conf=None):
        if self._sr_url is None:
            raise RuntimeError("No Schema-registry available in Byo cluster")
        return SchemaRegistryClient({"url": self._sr_url})

    def client_conf(self, conf=None):
        """
        The client configuration
        """
        client_conf = self._conf.copy()
        if conf is not None:
            client_conf.update(conf)

        return client_conf

    def stop(self):
        pass