1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
package kafka
import (
"context"
"errors"
"fmt"
"net"
"github.com/segmentio/kafka-go/protocol"
produceAPI "github.com/segmentio/kafka-go/protocol/produce"
"github.com/segmentio/kafka-go/protocol/rawproduce"
)
// RawProduceRequest represents a request sent to a kafka broker to produce records
// to a topic partition. The request contains a pre-encoded/raw record set.
type RawProduceRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The topic to produce the records to.
Topic string
// The partition to produce the records to.
Partition int
// The level of required acknowledgements to ask the kafka broker for.
RequiredAcks RequiredAcks
// The message format version used when encoding the records.
//
// By default, the client automatically determine which version should be
// used based on the version of the Produce API supported by the server.
MessageVersion int
// An optional transaction id when producing to the kafka broker is part of
// a transaction.
TransactionalID string
// The sequence of records to produce to the topic partition.
RawRecords protocol.RawRecordSet
}
// RawProduce sends a raw produce request to a kafka broker and returns the response.
//
// If the request contained no records, an error wrapping protocol.ErrNoRecord
// is returned.
//
// When the request is configured with RequiredAcks=none, both the response and
// the error will be nil on success.
func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{
TransactionalID: req.TransactionalID,
Acks: int16(req.RequiredAcks),
Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
Topics: []rawproduce.RequestTopic{{
Topic: req.Topic,
Partitions: []rawproduce.RequestPartition{{
Partition: int32(req.Partition),
RecordSet: req.RawRecords,
}},
}},
})
switch {
case err == nil:
case errors.Is(err, protocol.ErrNoRecord):
return new(ProduceResponse), nil
default:
return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err)
}
if req.RequiredAcks == RequireNone {
return nil, nil
}
res := m.(*produceAPI.Response)
if len(res.Topics) == 0 {
return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic)
}
topic := &res.Topics[0]
if len(topic.Partitions) == 0 {
return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition)
}
partition := &topic.Partitions[0]
ret := &ProduceResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(partition.ErrorCode, partition.ErrorMessage),
BaseOffset: partition.BaseOffset,
LogAppendTime: makeTime(partition.LogAppendTime),
LogStartOffset: partition.LogStartOffset,
}
if len(partition.RecordErrors) != 0 {
ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
for _, recErr := range partition.RecordErrors {
ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
}
}
return ret, nil
}
|