1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
//go:build !functional
package sarama
import "testing"
var (
fetchRequestNoBlocks = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}
fetchRequestWithProperties = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0xEF,
0x00, 0x00, 0x00, 0x00,
}
fetchRequestOneBlock = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56,
}
fetchRequestOneBlockV4 = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xFF,
0x01,
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, 0x00, 0x00, 0x00, 0x56,
}
fetchRequestOneBlockV11 = []byte{
0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xFF,
0x01,
0x00, 0x00, 0x00, 0xAA, // sessionID
0x00, 0x00, 0x00, 0xEE, // sessionEpoch
0x00, 0x00, 0x00, 0x01,
0x00, 0x05, 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x12, // partitionID
0x00, 0x00, 0x00, 0x66, // currentLeaderEpoch
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x34, // fetchOffset
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // logStartOffset
0x00, 0x00, 0x00, 0x56, // maxBytes
0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 'r', 'a', 'c', 'k', '0', '1', // rackID
}
)
func TestFetchRequest(t *testing.T) {
t.Run("no blocks", func(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)
})
t.Run("with properties", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)
})
t.Run("one block", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
testRequest(t, "one block", request, fetchRequestOneBlock)
})
t.Run("one block v4", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.AddBlock("topic", 0x12, 0x34, 0x56, -1)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})
t.Run("one block v11 rackid and leader epoch", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 11
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.SessionID = 0xAA
request.SessionEpoch = 0xEE
request.AddBlock("topic", 0x12, 0x34, 0x56, 0x66)
request.RackID = "rack01"
testRequest(t, "one block v11 rackid", request, fetchRequestOneBlockV11)
})
}
|