File: confluent_cloud_example.go

package info (click to toggle)
golang-github-confluentinc-confluent-kafka-go 0.11.6-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, buster, forky, sid, trixie
  • size: 604 kB
  • sloc: sh: 23; python: 15; ansic: 13; makefile: 9
file content (104 lines) | stat: -rw-r--r-- 3,049 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// This is a simple example demonstrating how to produce a message to 
// Confluent Cloud then read it back again.
//     
// https://www.confluent.io/confluent-cloud/
// 
// Auto-creation of topics is disabled in Confluent Cloud. You will need to 
// use the ccloud cli to create the go-test-topic topic before running this
// example.
//
// $ ccloud topic create go-test-topic
//
// The <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters
// are available via the Confluent Cloud web interface. For more information,
// refer to the quick-start:
//     
// https://docs.confluent.io/current/cloud-quickstart.html
package main

/**
 * Copyright 2018 Confluent Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import (
	"time"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": "<ccloud bootstrap servers>",
		"broker.version.fallback": "0.10.0.0",
		"api.version.fallback.ms": 0,
		"sasl.mechanisms": "PLAIN",
		"security.protocol": "SASL_SSL",
		"sasl.username": "<ccloud key>",
		"sasl.password": "<ccloud secret>",})

	if err != nil {
		panic(fmt.Sprintf("Failed to create producer: %s", err))
	}

	value := "golang test value"
	topic := "go-test-topic"
	p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte(value),
	}, nil)

	// Wait for delivery report
	e := <-p.Events()

	m := e.(*kafka.Message)
	if m.TopicPartition.Error != nil {
		fmt.Printf("failed to deliver message: %v\n", m.TopicPartition)
	} else {
		fmt.Printf("delivered to topic %s [%d] at offset %v\n",
			*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
	}

	p.Close()


	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "<ccloud bootstrap servers>",
		"broker.version.fallback": "0.10.0.0",
		"api.version.fallback.ms": 0,
		"sasl.mechanisms": "PLAIN",
		"security.protocol": "SASL_SSL",
		"sasl.username": "<ccloud key>",
		"sasl.password": "<ccloud secret>",
		"session.timeout.ms": 6000,
		"group.id": "my-group",
		"default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},})

	if err != nil {
		panic(fmt.Sprintf("Failed to create consumer: %s", err))
	}

	topics := []string { topic }
	c.SubscribeTopics(topics, nil)

	for {
		msg, err := c.ReadMessage(100 * time.Millisecond)
		if err == nil {
			fmt.Printf("consumed: %s: %s\n", msg.TopicPartition, string(msg.Value))
		}
	}

	c.Close()
}