1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
//go:build !functional
package sarama
import (
"errors"
"fmt"
"testing"
"time"
)
var (
produceResponseNoBlocksV0 = []byte{
0x00, 0x00, 0x00, 0x00,
}
produceResponseManyBlocksVersions = map[int][]byte{
0: {
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
},
1: {
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
2: {
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
7: { // version 7 adds StartOffset
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50
0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
}
)
func TestProduceResponseDecode(t *testing.T) {
response := ProduceResponse{}
testVersionDecodable(t, "no blocks", &response, produceResponseNoBlocksV0, 0)
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none")
}
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
t.Logf("Decoding produceResponseManyBlocks version %d", v)
testVersionDecodable(t, "many blocks", &response, produceResponseManyBlocks, int16(v))
if len(response.Blocks) != 1 {
t.Error("Decoding produced", len(response.Blocks), "topics where there was 1")
}
if len(response.Blocks["foo"]) != 1 {
t.Error("Decoding produced", len(response.Blocks["foo"]), "partitions for 'foo' where there was one")
}
block := response.GetBlock("foo", 1)
if block == nil {
t.Error("Decoding did not produce a block for foo/1")
} else {
if !errors.Is(block.Err, ErrInvalidMessage) {
t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
}
if block.Offset != 255 {
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
}
if v >= 2 {
if !block.Timestamp.Equal(time.Unix(1, 0)) {
t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
}
}
if v >= 7 {
if block.StartOffset != 50 {
t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
}
}
}
if v >= 1 {
if expected := 100 * time.Millisecond; response.ThrottleTime != expected {
t.Error("Failed decoding produced throttle time, expected:", expected, ", got:", response.ThrottleTime)
}
}
}
}
func TestProduceResponseEncode(t *testing.T) {
response := ProduceResponse{}
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
testEncodable(t, "empty", &response, produceResponseNoBlocksV0)
response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["foo"][1] = &ProduceResponseBlock{
Err: ErrInvalidMessage,
Offset: 255,
Timestamp: time.Unix(1, 0),
StartOffset: 50,
}
response.ThrottleTime = 100 * time.Millisecond
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
response.Version = int16(v)
testEncodable(t, fmt.Sprintf("many blocks version %d", v), &response, produceResponseManyBlocks)
}
}
func TestProduceResponseEncodeInvalidTimestamp(t *testing.T) {
response := ProduceResponse{}
response.Version = 2
response.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
response.Blocks["t"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["t"][0] = &ProduceResponseBlock{
Err: ErrNoError,
Offset: 0,
// Use a timestamp before Unix time
Timestamp: time.Unix(0, 0).Add(-1 * time.Millisecond),
}
response.ThrottleTime = 100 * time.Millisecond
_, err := encode(&response, nil)
if err == nil {
t.Error("Expecting error, got nil")
}
target := PacketEncodingError{}
if !errors.As(err, &target) {
t.Error("Expecting PacketEncodingError, got:", err)
}
}
|