1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
package sarama
import (
"encoding/binary"
"time"
)
const (
isTransactionalMask = 0x10
controlMask = 0x20
maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
)
// RecordHeader stores key and value for a record header
type RecordHeader struct {
Key []byte
Value []byte
}
func (h *RecordHeader) encode(pe packetEncoder) error {
if err := pe.putVarintBytes(h.Key); err != nil {
return err
}
return pe.putVarintBytes(h.Value)
}
func (h *RecordHeader) decode(pd packetDecoder) (err error) {
if h.Key, err = pd.getVarintBytes(); err != nil {
return err
}
if h.Value, err = pd.getVarintBytes(); err != nil {
return err
}
return nil
}
// Record is kafka record type
type Record struct {
Headers []*RecordHeader
Attributes int8
TimestampDelta time.Duration
OffsetDelta int64
Key []byte
Value []byte
length varintLengthField
}
func (r *Record) encode(pe packetEncoder) error {
pe.push(&r.length)
pe.putInt8(r.Attributes)
pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
pe.putVarint(r.OffsetDelta)
if err := pe.putVarintBytes(r.Key); err != nil {
return err
}
if err := pe.putVarintBytes(r.Value); err != nil {
return err
}
pe.putVarint(int64(len(r.Headers)))
for _, h := range r.Headers {
if err := h.encode(pe); err != nil {
return err
}
}
return pe.pop()
}
func (r *Record) decode(pd packetDecoder) (err error) {
if err = pd.push(&r.length); err != nil {
return err
}
if r.Attributes, err = pd.getInt8(); err != nil {
return err
}
timestamp, err := pd.getVarint()
if err != nil {
return err
}
r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
if r.OffsetDelta, err = pd.getVarint(); err != nil {
return err
}
if r.Key, err = pd.getVarintBytes(); err != nil {
return err
}
if r.Value, err = pd.getVarintBytes(); err != nil {
return err
}
numHeaders, err := pd.getVarint()
if err != nil {
return err
}
if numHeaders >= 0 {
r.Headers = make([]*RecordHeader, numHeaders)
}
for i := int64(0); i < numHeaders; i++ {
hdr := new(RecordHeader)
if err := hdr.decode(pd); err != nil {
return err
}
r.Headers[i] = hdr
}
return pd.pop()
}
|