1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
package kafka
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol"
)
const (
defaultCreateTopicsTimeout = 2 * time.Second
defaultDeleteTopicsTimeout = 2 * time.Second
defaultCreatePartitionsTimeout = 2 * time.Second
defaultProduceTimeout = 500 * time.Millisecond
defaultMaxWait = 500 * time.Millisecond
)
// Client is a high-level API to interract with kafka brokers.
//
// All methods of the Client type accept a context as first argument, which may
// be used to asynchronously cancel the requests.
//
// Clients are safe to use concurrently from multiple goroutines, as long as
// their configuration is not changed after first use.
type Client struct {
// Address of the kafka cluster (or specific broker) that the client will be
// sending requests to.
//
// This field is optional, the address may be provided in each request
// instead. The request address takes precedence if both were specified.
Addr net.Addr
// Time limit for requests sent by this client.
//
// If zero, no timeout is applied.
Timeout time.Duration
// A transport used to communicate with the kafka brokers.
//
// If nil, DefaultTransport is used.
Transport RoundTripper
}
// A ConsumerGroup and Topic as these are both strings we define a type for
// clarity when passing to the Client as a function argument
//
// N.B TopicAndGroup is currently experimental! Therefore, it is subject to
// change, including breaking changes between MINOR and PATCH releases.
//
// DEPRECATED: this type will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
type TopicAndGroup struct {
Topic string
GroupId string
}
// ConsumerOffsets returns a map[int]int64 of partition to committed offset for
// a consumer group id and topic.
//
// DEPRECATED: this method will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
metadata, err := c.Metadata(ctx, &MetadataRequest{
Topics: []string{tg.Topic},
})
if err != nil {
return nil, fmt.Errorf("failed to get topic metadata :%w", err)
}
topic := metadata.Topics[0]
partitions := make([]int, len(topic.Partitions))
for i := range topic.Partitions {
partitions[i] = topic.Partitions[i].ID
}
offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
GroupID: tg.GroupId,
Topics: map[string][]int{
tg.Topic: partitions,
},
})
if err != nil {
return nil, fmt.Errorf("failed to get offsets: %w", err)
}
topicOffsets := offsets.Topics[topic.Name]
partitionOffsets := make(map[int]int64, len(topicOffsets))
for _, off := range topicOffsets {
partitionOffsets[off.Partition] = off.CommittedOffset
}
return partitionOffsets, nil
}
func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
if c.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.Timeout)
defer cancel()
}
if addr == nil {
if addr = c.Addr; addr == nil {
return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
}
}
return c.transport().RoundTrip(ctx, addr, msg)
}
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
timeout := c.Timeout
if deadline, ok := ctx.Deadline(); ok {
if remain := time.Until(deadline); remain < timeout {
timeout = remain
}
}
if timeout > 0 {
// Half the timeout because it is communicated to kafka in multiple
// requests (e.g. Fetch, Produce, etc...), this adds buffer to account
// for network latency when waiting for the response from kafka.
return timeout / 2
}
return defaultTimeout
}
func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
return milliseconds(c.timeout(ctx, defaultTimeout))
}
|