1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
#!/usr/bin/env python
#
# Copyright 2018 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import argparse
from uuid import uuid4
from six.moves import input
from confluent_kafka import avro
# Parse Schema used for serializing User class
record_schema = avro.loads("""
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
""")
class User(object):
"""
User stores the deserialized user Avro record.
"""
# Use __slots__ to explicitly declare all data members.
__slots__ = ["name", "favorite_number", "favorite_color", "id"]
def __init__(self, name=None, favorite_number=None, favorite_color=None):
self.name = name
self.favorite_number = favorite_number
self.favorite_color = favorite_color
# Unique id used to track produce request success/failures.
# Do *not* include in the serialized object.
self.id = uuid4()
def to_dict(self):
"""
The Avro Python library does not support code generation.
For this reason we must provide a dict representation of our class for serialization.
"""
return {
"name": self.name,
"favorite_number": self.favorite_number,
"favorite_color": self.favorite_color
}
def on_delivery(err, msg, obj):
"""
Handle delivery reports served from producer.poll.
This callback takes an extra argument, obj.
This allows the original contents to be included for debugging purposes.
"""
if err is not None:
print('Message {} delivery failed for user {} with error {}'.format(
obj.id, obj.name, err))
else:
print('Message {} successfully produced to {} [{}] at offset {}'.format(
obj.id, msg.topic(), msg.partition(), msg.offset()))
def produce(topic, conf):
"""
Produce User records
"""
from confluent_kafka.avro import AvroProducer
producer = AvroProducer(conf, default_value_schema=record_schema)
print("Producing user records to topic {}. ^c to exit.".format(topic))
while True:
# Instantiate new User, populate fields, produce record, execute callbacks.
record = User()
try:
record.name = input("Enter name: ")
record.favorite_number = int(input("Enter favorite number: "))
record.favorite_color = input("Enter favorite color: ")
# The message passed to the delivery callback will already be serialized.
# To aid in debugging we provide the original object to the delivery callback.
producer.produce(topic=topic, value=record.to_dict(),
callback=lambda err, msg, obj=record: on_delivery(err, msg, obj))
# Serve on_delivery callbacks from previous asynchronous produce()
producer.poll(0)
except KeyboardInterrupt:
break
except ValueError:
print("Invalid input, discarding record...")
continue
print("\nFlushing records...")
producer.flush()
def consume(topic, conf):
"""
Consume User records
"""
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
print("Consuming user records from topic {} with group {}. ^c to exit.".format(topic, conf["group.id"]))
c = AvroConsumer(conf, reader_value_schema=record_schema)
c.subscribe([topic])
while True:
try:
msg = c.poll(1)
# There were no messages on the queue, continue polling
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
record = User(msg.value())
print("name: {}\n\tfavorite_number: {}\n\tfavorite_color: {}\n".format(
record.name, record.favorite_number, record.favorite_color))
except SerializerError as e:
# Report malformed record, discard results, continue polling
print("Message deserialization failed {}".format(e))
continue
except KeyboardInterrupt:
break
print("Shutting down consumer..")
c.close()
def main(args):
# handle common configs
conf = {'bootstrap.servers': args.bootstrap_servers,
'schema.registry.url': args.schema_registry}
if args.userinfo:
conf['schema.registry.basic.auth.credentials.source'] = 'USER_INFO'
conf['schema.registry.basic.auth.user.info'] = args.userinfo
if args.mode == "produce":
produce(args.topic, conf)
else:
# Fallback to earliest to ensure all messages are consumed
conf['group.id'] = args.group
conf['auto.offset.reset'] = "earliest"
consume(args.topic, conf)
if __name__ == '__main__':
# To use the provided cluster execute <source root>/tests/docker/bin/cluster_up.sh.
# Defaults assume the use of the provided test cluster.
parser = argparse.ArgumentParser(description="Example client for handling Avro data")
parser.add_argument('-b', dest="bootstrap_servers",
default="localhost:29092", help="Bootstrap broker(s) (host[:port])")
parser.add_argument('-s', dest="schema_registry",
default="http://localhost:8083", help="Schema Registry (http(s)://host[:port]")
parser.add_argument('-t', dest="topic", default="example_avro",
help="Topic name")
parser.add_argument('-u', dest="userinfo", default="ckp_tester:test_secret",
help="Userinfo (username:password); requires Schema Registry with HTTP basic auth enabled")
parser.add_argument('mode', choices=['produce', 'consume'],
help="Execution mode (produce | consume)")
parser.add_argument('-g', dest="group", default="example_avro",
help="Consumer group; required if running 'consumer' mode")
main(parser.parse_args())
|