File: cluster_fixture.py

package info (click to toggle)
python-confluent-kafka 2.11.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,660 kB
  • sloc: python: 30,428; ansic: 9,487; sh: 1,477; makefile: 192
file content (409 lines) | stat: -rw-r--r-- 13,426 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2020 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import time
from uuid import uuid1

from trivup.clusters.KafkaCluster import KafkaCluster

from confluent_kafka import Producer, SerializingProducer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
from confluent_kafka.schema_registry._async.schema_registry_client import AsyncSchemaRegistryClient

from tests.common import TestConsumer
from tests.common.schema_registry import TestDeserializingConsumer
from tests.common._async.consumer import TestAsyncDeserializingConsumer
from tests.common._async.producer import TestAsyncSerializingProducer


class KafkaClusterFixture(object):
    __slots__ = ['_cluster', '_admin', '_producer']

    def __init__(self, conf):
        raise NotImplementedError("KafkaCluster should never be instantiated directly")

    def schema_registry(self, conf=None):
        raise NotImplementedError("schema_registry has not been implemented")

    def client_conf(self, conf=None):
        raise NotImplementedError("client_conf has not been implemented")

    @classmethod
    def _topic_conf(cls, conf=None):
        topic_conf = {'num_partitions': 1,
                      'replication_factor': 1}

        if conf is not None:
            topic_conf.update(conf)

        return topic_conf

    def cimpl_producer(self, conf=None):
        """
        Returns a producer bound to this cluster.

        Args:
            conf (dict): Producer configuration overrides

        Returns:
            Producer: A new Producer instance
        """
        return Producer(self.client_conf(conf))

    def producer(self, conf=None, key_serializer=None, value_serializer=None):
        """
        Returns a producer bound to this cluster.

        Args:
            conf (dict): Producer configuration overrides

            key_serializer (Serializer): serializer to apply to message key

            value_serializer (Deserializer): serializer to apply to
                message value

        Returns:
            Producer: A new SerializingProducer instance

        """
        client_conf = self.client_conf(conf)

        if key_serializer is not None:
            client_conf['key.serializer'] = key_serializer

        if value_serializer is not None:
            client_conf['value.serializer'] = value_serializer

        return SerializingProducer(client_conf)

    def async_producer(self, conf=None, key_serializer=None, value_serializer=None):
        """
        Returns a producer bound to this cluster.

        Args:
            conf (dict): Producer configuration overrides

            key_serializer (Serializer): serializer to apply to message key

            value_serializer (Deserializer): serializer to apply to
                message value

        Returns:
            Producer: A new SerializingProducer instance

        """
        client_conf = self.client_conf(conf)

        if key_serializer is not None:
            client_conf['key.serializer'] = key_serializer

        if value_serializer is not None:
            client_conf['value.serializer'] = value_serializer

        return TestAsyncSerializingProducer(client_conf)

    def cimpl_consumer(self, conf=None):
        """
        Returns a consumer bound to this cluster.

        Args:
            conf (dict): Consumer config overrides

        Returns:
            Consumer: A new Consumer instance

        """
        consumer_conf = self.client_conf({
            'group.id': str(uuid1()),
            'auto.offset.reset': 'earliest'
        })

        if conf is not None:
            consumer_conf.update(conf)

        return TestConsumer(consumer_conf)

    def consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
        """
        Returns a consumer bound to this cluster.

        Args:
            conf (dict): Consumer config overrides

            key_deserializer (Deserializer): deserializer to apply to
                message key

            value_deserializer (Deserializer): deserializer to apply to
                message value

        Returns:
            Consumer: A new DeserializingConsumer instance

        """
        consumer_conf = self.client_conf({
            'group.id': str(uuid1()),
            'auto.offset.reset': 'earliest'
        })

        if conf is not None:
            consumer_conf.update(conf)

        if key_deserializer is not None:
            consumer_conf['key.deserializer'] = key_deserializer

        if value_deserializer is not None:
            consumer_conf['value.deserializer'] = value_deserializer

        return TestDeserializingConsumer(consumer_conf)

    def async_consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
        """
        Returns a consumer bound to this cluster.

        Args:
            conf (dict): Consumer config overrides

            key_deserializer (Deserializer): deserializer to apply to
                message key

            value_deserializer (Deserializer): deserializer to apply to
                message value

        Returns:
            Consumer: A new DeserializingConsumer instance

        """
        consumer_conf = self.client_conf({
            'group.id': str(uuid1()),
            'auto.offset.reset': 'earliest'
        })

        if conf is not None:
            consumer_conf.update(conf)

        if key_deserializer is not None:
            consumer_conf['key.deserializer'] = key_deserializer

        if value_deserializer is not None:
            consumer_conf['value.deserializer'] = value_deserializer

        return TestAsyncDeserializingConsumer(consumer_conf)

    def admin(self, conf=None):
        if conf:
            # When conf is passed create a new AdminClient
            admin_conf = self.client_conf()
            admin_conf.update(conf)
            return AdminClient(admin_conf)

        # Otherwise use a common one
        if self._admin is None:
            self._admin = AdminClient(self.client_conf())
        return self._admin

    def create_topic(self, prefix, conf=None, **create_topic_kwargs):
        """
        Creates a new topic with this cluster.

        :param str prefix: topic name
        :param dict conf: additions/overrides to topic configuration.
        :returns: The topic's name
        :rtype: str
        """
        name = prefix + "-" + str(uuid1())
        future_topic = self.admin().create_topics([NewTopic(name,
                                                            **self._topic_conf(conf))],
                                                  **create_topic_kwargs)

        future_topic.get(name).result()
        return name

    def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs):
        """
        Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers.

        :param str prefix: topic name
        :param dict conf: additions/overrides to topic configuration.
        :returns: The topic's name
        :rtype: str
        """
        name = self.create_topic(prefix, conf, **create_topic_kwargs)

        # wait for topic propogation across all the brokers.
        # FIXME: find a better way to wait for topic creation
        #        for all languages, given option to send request to
        #        a specific broker isn't present everywhere.
        time.sleep(1)

        return name

    def seed_topic(self, topic, value_source=None, key_source=None, header_source=None):
        """
        Populates a topic with data asynchronously.

        If there are fewer keys or headers than values the index will be wrapped and reused with
        each subsequent produce request.

        :param str topic: topic to produce test data to.
        :param list value_source: list or other iterable containing message key data to be produced.
        :param list key_source: list or other iterable containing message value data to be produced.
        :param list(dict) header_source: headers to attach to test data.
        """

        if self._producer is None:
            self._producer = self.producer(self.client_conf({'linger.ms': 500}))

        if value_source is None:
            value_source = ['test-data{}'.format(i) for i in range(0, 100)]

        if key_source is None:
            key_source = [None]

        KafkaClusterFixture._produce(self._producer, topic, value_source, key_source, header_source)

    @staticmethod
    def _produce(producer, topic, value_source, key_source, header_source=None):
        """
        Produces a message for each value in value source to a topic.

        value_source drives the produce loop. If corresponding key and header sources
        are specified they will be applied in round robbin order. In the event there
        are less keys or headers than values the index will be wrapped.

        :param producer: producer instance to use
        :param topic:  topic to produce to
        :param value_source: collection of message values
        :param key_source: optional keys to accompany values_source items
        :param header_source: optional headers to accompany value_source items
        :return:
        """
        num_messages = len(value_source)
        num_keys = len(key_source)
        num_headers = len(header_source) if header_source else 0

        print('# producing {} messages to topic {}'.format(num_messages, topic))
        for i in range(0, num_messages):
            while True:
                try:
                    if header_source is not None:
                        producer.produce(topic,
                                         value=value_source[i],
                                         key=key_source[i % num_keys],
                                         headers=header_source[i % (num_headers + 1)])
                    else:
                        producer.produce(topic,
                                         value=value_source[i],
                                         key=key_source[i % num_keys])
                    break
                except BufferError:
                    producer.poll(0.1)
                continue
            producer.poll(0.0)

        producer.flush()

        print('# finished producing {} messages to topic {}'.format(num_messages, topic))


class TrivupFixture(KafkaClusterFixture):
    """
    Starts and stops a Kafka Cluster instances for use in testing.

    Optionally provides a Confluent Schema Registry instance for Avro testing.

    """

    default_options = {
        'broker_cnt': 1,
    }

    def __init__(self, conf):
        self._cluster = KafkaCluster(**conf)
        self._admin = None
        self._producer = None
        self._cluster.wait_operational()

    def schema_registry(self, conf=None):
        if not hasattr(self._cluster, 'sr'):
            return None

        sr_conf = {'url': self._cluster.sr.get('url')}
        if conf is not None:
            sr_conf.update(conf)
        return SchemaRegistryClient(sr_conf)

    def async_schema_registry(self, conf=None):
        if not hasattr(self._cluster, 'sr'):
            return None

        sr_conf = {'url': self._cluster.sr.get('url')}
        if conf is not None:
            sr_conf.update(conf)
        return AsyncSchemaRegistryClient(sr_conf)

    def client_conf(self, conf=None):
        """
        Default client configuration

        :param dict conf: default client configuration overrides
        :returns: client configuration
        """
        client_conf = self._cluster.client_conf()

        if conf is not None:
            client_conf.update(conf)

        return client_conf

    def stop(self):
        self._cluster.stop(cleanup=True)


class ByoFixture(KafkaClusterFixture):
    """
    A Kafka cluster fixture that assumes an already running cluster as specified
    by bootstrap.servers, and optionally schema.registry.url, in the conf dict.
    """

    def __init__(self, conf):
        if conf.get("bootstrap.servers", "") == "":
            raise ValueError("'bootstrap.servers' must be set in the "
                             "conf dict")
        self._admin = None
        self._producer = None
        self._conf = conf.copy()
        self._sr_url = self._conf.pop("schema.registry.url", None)

    def schema_registry(self, conf=None):
        if self._sr_url is None:
            raise RuntimeError("No Schema-registry available in Byo cluster")
        return SchemaRegistryClient({"url": self._sr_url})

    def client_conf(self, conf=None):
        """
        The client configuration
        """
        client_conf = self._conf.copy()
        if conf is not None:
            client_conf.update(conf)

        return client_conf

    def stop(self):
        pass