1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
|
package kafka
/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"fmt"
"reflect"
"strings"
"unsafe"
)
/*
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
*/
import "C"
// ConfigValue supports the following types:
// bool, int, string, any type with the standard String() interface
type ConfigValue interface{}
// ConfigMap is a map contaning standard librdkafka configuration properties as documented in:
// https://github.com/edenhill/librdkafka/tree/master/CONFIGURATION.md
//
// The special property "default.topic.config" (optional) is a ConfigMap containing default topic
// configuration properties.
type ConfigMap map[string]ConfigValue
// SetKey sets configuration property key to value.
// For user convenience a key prefixed with {topic}. will be
// set on the "default.topic.config" sub-map.
func (m ConfigMap) SetKey(key string, value ConfigValue) error {
if strings.HasPrefix(key, "{topic}.") {
_, found := m["default.topic.config"]
if !found {
m["default.topic.config"] = ConfigMap{}
}
m["default.topic.config"].(ConfigMap)[strings.TrimPrefix(key, "{topic}.")] = value
} else {
m[key] = value
}
return nil
}
// Set implements flag.Set (command line argument parser) as a convenience
// for `-X key=value` config.
func (m ConfigMap) Set(kv string) error {
i := strings.Index(kv, "=")
if i == -1 {
return Error{ErrInvalidArg, "Expected key=value"}
}
k := kv[:i]
v := kv[i+1:]
return m.SetKey(k, v)
}
func value2string(v ConfigValue) (ret string, errstr string) {
switch x := v.(type) {
case bool:
if x {
ret = "true"
} else {
ret = "false"
}
case int:
ret = fmt.Sprintf("%d", x)
case string:
ret = x
case fmt.Stringer:
ret = x.String()
default:
return "", fmt.Sprintf("Invalid value type %T", v)
}
return ret, ""
}
// rdkAnyconf abstracts rd_kafka_conf_t and rd_kafka_topic_conf_t
// into a common interface.
type rdkAnyconf interface {
set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t
}
func anyconfSet(anyconf rdkAnyconf, key string, val ConfigValue) (err error) {
value, errstr := value2string(val)
if errstr != "" {
return Error{ErrInvalidArg, fmt.Sprintf("%s for key %s (expected string,bool,int,ConfigMap)", errstr, key)}
}
cKey := C.CString(key)
cVal := C.CString(value)
cErrstr := (*C.char)(C.malloc(C.size_t(128)))
defer C.free(unsafe.Pointer(cErrstr))
if anyconf.set(cKey, cVal, cErrstr, 128) != C.RD_KAFKA_CONF_OK {
C.free(unsafe.Pointer(cKey))
C.free(unsafe.Pointer(cVal))
return newErrorFromCString(C.RD_KAFKA_RESP_ERR__INVALID_ARG, cErrstr)
}
return nil
}
// we need these typedefs to workaround a crash in golint
// when parsing the set() methods below
type rdkConf C.rd_kafka_conf_t
type rdkTopicConf C.rd_kafka_topic_conf_t
func (cConf *rdkConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
return C.rd_kafka_conf_set((*C.rd_kafka_conf_t)(cConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
}
func (ctopicConf *rdkTopicConf) set(cKey *C.char, cVal *C.char, cErrstr *C.char, errstrSize int) C.rd_kafka_conf_res_t {
return C.rd_kafka_topic_conf_set((*C.rd_kafka_topic_conf_t)(ctopicConf), cKey, cVal, cErrstr, C.size_t(errstrSize))
}
func configConvertAnyconf(m ConfigMap, anyconf rdkAnyconf) (err error) {
// set plugins first, any plugin-specific configuration depends on
// the plugin to have already been set
pluginPaths, ok := m["plugin.library.paths"]
if ok {
err = anyconfSet(anyconf, "plugin.library.paths", pluginPaths)
if err != nil {
return err
}
}
for k, v := range m {
if k == "plugin.library.paths" {
continue
}
switch v.(type) {
case ConfigMap:
/* Special sub-ConfigMap, only used for default.topic.config */
if k != "default.topic.config" {
return Error{ErrInvalidArg, fmt.Sprintf("Invalid type for key %s", k)}
}
var cTopicConf = C.rd_kafka_topic_conf_new()
err = configConvertAnyconf(v.(ConfigMap),
(*rdkTopicConf)(cTopicConf))
if err != nil {
C.rd_kafka_topic_conf_destroy(cTopicConf)
return err
}
C.rd_kafka_conf_set_default_topic_conf(
(*C.rd_kafka_conf_t)(anyconf.(*rdkConf)),
(*C.rd_kafka_topic_conf_t)((*rdkTopicConf)(cTopicConf)))
default:
err = anyconfSet(anyconf, k, v)
if err != nil {
return err
}
}
}
return nil
}
// convert ConfigMap to C rd_kafka_conf_t *
func (m ConfigMap) convert() (cConf *C.rd_kafka_conf_t, err error) {
cConf = C.rd_kafka_conf_new()
err = configConvertAnyconf(m, (*rdkConf)(cConf))
if err != nil {
C.rd_kafka_conf_destroy(cConf)
return nil, err
}
return cConf, nil
}
// get finds key in the configmap and returns its value.
// If the key is not found defval is returned.
// If the key is found but the type is mismatched an error is returned.
func (m ConfigMap) get(key string, defval ConfigValue) (ConfigValue, error) {
if strings.HasPrefix(key, "{topic}.") {
defconfCv, found := m["default.topic.config"]
if !found {
return defval, nil
}
return defconfCv.(ConfigMap).get(strings.TrimPrefix(key, "{topic}."), defval)
}
v, ok := m[key]
if !ok {
return defval, nil
}
if defval != nil && reflect.TypeOf(defval) != reflect.TypeOf(v) {
return nil, Error{ErrInvalidArg, fmt.Sprintf("%s expects type %T, not %T", key, defval, v)}
}
return v, nil
}
// extract performs a get() and if found deletes the key.
func (m ConfigMap) extract(key string, defval ConfigValue) (ConfigValue, error) {
v, err := m.get(key, defval)
if err != nil {
return nil, err
}
delete(m, key)
return v, nil
}
func (m ConfigMap) clone() ConfigMap {
m2 := make(ConfigMap)
for k, v := range m {
m2[k] = v
}
return m2
}
// Get finds the given key in the ConfigMap and returns its value.
// If the key is not found `defval` is returned.
// If the key is found but the type does not match that of `defval` (unless nil)
// an ErrInvalidArg error is returned.
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error) {
return m.get(key, defval)
}
|