1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
|
/*
* kafkacat - Apache Kafka consumer and producer
*
* Copyright (c) 2015-2019, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#pragma once
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <librdkafka/rdkafka.h>
#include "rdport.h"
#ifdef _MSC_VER
#pragma comment(lib, "librdkafka.lib")
#include "win32/win32_config.h"
#else
#include "config.h"
#endif
#if ENABLE_AVRO
#include <libserdes/serdes.h>
#endif
#ifdef RD_KAFKA_V_HEADER
#define HAVE_HEADERS 1
#else
#define HAVE_HEADERS 0
#endif
#if RD_KAFKA_VERSION >= 0x000b0500
#define HAVE_CONTROLLERID 1
#else
#define HAVE_CONTROLLERID 0
#endif
typedef enum {
KC_FMT_STR,
KC_FMT_OFFSET,
KC_FMT_KEY,
KC_FMT_KEY_LEN,
KC_FMT_PAYLOAD,
KC_FMT_PAYLOAD_LEN,
KC_FMT_PAYLOAD_LEN_BINARY,
KC_FMT_TOPIC,
KC_FMT_PARTITION,
KC_FMT_TIMESTAMP,
KC_FMT_HEADERS
} fmt_type_t;
#define KC_FMT_MAX_SIZE 128
typedef enum {
KC_MSG_FIELD_VALUE,
KC_MSG_FIELD_KEY,
KC_MSG_FIELD_CNT
} kc_msg_field_t;
struct conf {
int run;
int verbosity;
int exitcode;
int exitonerror;
char mode;
int flags;
#define CONF_F_FMT_JSON 0x1 /* JSON formatting */
#define CONF_F_KEY_DELIM 0x2 /* Producer: use key delimiter */
#define CONF_F_OFFSET 0x4 /* Print offsets */
#define CONF_F_TEE 0x8 /* Tee output when producing */
#define CONF_F_NULL 0x10 /* -Z: Send empty messages as NULL */
#define CONF_F_LINE 0x20 /* Read files in line mode when producing */
#define CONF_F_APIVERREQ 0x40 /* Enable api.version.request=true */
#define CONF_F_APIVERREQ_USER 0x80 /* User set api.version.request */
#define CONF_F_NO_CONF_SEARCH 0x100 /* Disable default config file search */
#define CONF_F_BROKERS_SEEN 0x200 /* Brokers have been configured */
#define CONF_F_FMT_AVRO_KEY 0x400 /* Convert key from Avro to JSON */
#define CONF_F_FMT_AVRO_VALUE 0x800 /* Convert value from Avro to JSON */
#define CONF_F_SR_URL_SEEN 0x1000 /* schema.registry.url/-r seen */
int delim;
int key_delim;
struct {
fmt_type_t type;
const char *str;
int str_len;
} fmt[KC_FMT_MAX_SIZE];
int fmt_cnt;
/**< Producer: per-field pack-format, see pack()
* Consumer: per-field unpack-format, see unpack(). */
const char *pack[KC_MSG_FIELD_CNT];
int msg_size;
char *brokers;
char *topic;
int32_t partition;
rd_kafka_headers_t *headers;
char *group;
char *fixed_key;
int32_t fixed_key_len;
int64_t offset;
#if RD_KAFKA_VERSION >= 0x00090300
int64_t startts;
int64_t stopts;
#endif
int exit_eof;
int64_t msg_cnt;
int metadata_timeout;
char *null_str;
int null_str_len;
int txn;
rd_kafka_conf_t *rk_conf;
rd_kafka_topic_conf_t *rkt_conf;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
char *debug;
int term_sig; /**< Termination signal */
#if ENABLE_AVRO
serdes_conf_t *srconf;
char *schema_registry_url;
#endif
};
extern struct conf conf;
void RD_NORETURN fatal0 (const char *func, int line,
const char *fmt, ...);
void error0 (int erroronexit, const char *func, int line,
const char *fmt, ...);
#define KC_FATAL(.../*fmt*/) fatal0(__FUNCTION__, __LINE__, __VA_ARGS__)
#define KC_ERROR(.../*fmt*/) error0(conf.exitonerror, __FUNCTION__, __LINE__, __VA_ARGS__)
/* Info printout */
#define KC_INFO(VERBLVL,.../*fmt*/) do { \
if (conf.verbosity >= (VERBLVL)) \
fprintf(stderr, "%% " __VA_ARGS__); \
} while (0)
/*
* format.c
*/
void pack_check (const char *what, const char *fmt);
void fmt_msg_output (FILE *fp, const rd_kafka_message_t *rkmessage);
void fmt_parse (const char *fmt);
void fmt_init (void);
void fmt_term (void);
#if ENABLE_JSON
/*
* json.c
*/
void fmt_msg_output_json (FILE *fp, const rd_kafka_message_t *rkmessage);
void metadata_print_json (const struct rd_kafka_metadata *metadata,
int32_t controllerid);
void partition_list_print_json (const rd_kafka_topic_partition_list_t *parts,
void *json_gen);
void fmt_init_json (void);
void fmt_term_json (void);
int json_can_emit_verbatim (void);
#endif
#if ENABLE_AVRO
/*
* avro.c
*/
char *kc_avro_to_json (const void *data, size_t data_len,
char *errstr, size_t errstr_size);
void kc_avro_init (const char *key_schema_name,
const char *key_schema_path,
const char *value_schema_name,
const char *value_schema_path);
void kc_avro_term (void);
#endif
/*
* tools.c
*/
int query_offsets_by_time (rd_kafka_topic_partition_list_t *offsets);
|