File: kafka_options.py

package info (click to toggle)
python-oslo.messaging 17.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 1,692 kB
  • sloc: python: 12,072; sh: 62; makefile: 24
file content (69 lines) | stat: -rw-r--r-- 3,210 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from oslo_config import cfg

from oslo_messaging._drivers import common

KAFKA_OPTS = [
    cfg.IntOpt('kafka_max_fetch_bytes', default=1024 * 1024,
               help='Max fetch bytes of Kafka consumer'),
    cfg.FloatOpt('kafka_consumer_timeout', default=1.0,
                 help='Default timeout(s) for Kafka consumers'),
    cfg.StrOpt('consumer_group', default="oslo_messaging_consumer",
               help='Group id for Kafka consumer. Consumers in one group '
                    'will coordinate message consumption'),
    cfg.FloatOpt('producer_batch_timeout', default=0.,
                 help="Upper bound on the delay for KafkaProducer batching "
                      "in seconds"),
    cfg.IntOpt('producer_batch_size', default=16384,
               help='Size of batch for the producer async send'),
    cfg.StrOpt('compression_codec', default='none',
               choices=['none', 'gzip', 'snappy', 'lz4', 'zstd'],
               help='The compression codec for all data generated by the '
                    'producer. If not set, compression will not be used. '
                    'Note that the allowed values of this depend on the kafka '
                    'version'),
    cfg.BoolOpt('enable_auto_commit',
                default=False,
                help='Enable asynchronous consumer commits'),
    cfg.IntOpt('max_poll_records', default=500,
               help='The maximum number of records returned in a poll call'),
    cfg.StrOpt('security_protocol', default='PLAINTEXT',
               choices=('PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL'),
               help='Protocol used to communicate with brokers'),
    cfg.StrOpt('sasl_mechanism',
               default='PLAIN',
               help='Mechanism when security protocol is SASL'),
    cfg.StrOpt('ssl_cafile',
               default='',
               help='CA certificate PEM file used to verify the server'
               ' certificate'),
    cfg.StrOpt('ssl_client_cert_file',
               default='',
               help='Client certificate PEM file used for authentication.'),
    cfg.StrOpt('ssl_client_key_file',
               default='',
               help='Client key PEM file used for authentication.'),
    cfg.StrOpt('ssl_client_key_password',
               default='',
               help='Client key password file used for authentication.')
]


def register_opts(conf, url):
    opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
                             title='Kafka driver options')
    conf.register_group(opt_group)
    conf.register_opts(KAFKA_OPTS, group=opt_group)
    return common.ConfigOptsProxy(conf, url, opt_group.name)