1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
package kafka
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestError(t *testing.T) {
errorCodes := []Error{
Unknown,
OffsetOutOfRange,
InvalidMessage,
UnknownTopicOrPartition,
InvalidMessageSize,
LeaderNotAvailable,
NotLeaderForPartition,
RequestTimedOut,
BrokerNotAvailable,
ReplicaNotAvailable,
MessageSizeTooLarge,
StaleControllerEpoch,
OffsetMetadataTooLarge,
GroupLoadInProgress,
GroupCoordinatorNotAvailable,
NotCoordinatorForGroup,
InvalidTopic,
RecordListTooLarge,
NotEnoughReplicas,
NotEnoughReplicasAfterAppend,
InvalidRequiredAcks,
IllegalGeneration,
InconsistentGroupProtocol,
InvalidGroupId,
UnknownMemberId,
InvalidSessionTimeout,
RebalanceInProgress,
InvalidCommitOffsetSize,
TopicAuthorizationFailed,
GroupAuthorizationFailed,
ClusterAuthorizationFailed,
InvalidTimestamp,
UnsupportedSASLMechanism,
IllegalSASLState,
UnsupportedVersion,
TopicAlreadyExists,
InvalidPartitionNumber,
InvalidReplicationFactor,
InvalidReplicaAssignment,
InvalidConfiguration,
NotController,
InvalidRequest,
UnsupportedForMessageFormat,
PolicyViolation,
OutOfOrderSequenceNumber,
DuplicateSequenceNumber,
InvalidProducerEpoch,
InvalidTransactionState,
InvalidProducerIDMapping,
InvalidTransactionTimeout,
ConcurrentTransactions,
TransactionCoordinatorFenced,
TransactionalIDAuthorizationFailed,
SecurityDisabled,
BrokerAuthorizationFailed,
KafkaStorageError,
LogDirNotFound,
SASLAuthenticationFailed,
UnknownProducerId,
ReassignmentInProgress,
DelegationTokenAuthDisabled,
DelegationTokenNotFound,
DelegationTokenOwnerMismatch,
DelegationTokenRequestNotAllowed,
DelegationTokenAuthorizationFailed,
DelegationTokenExpired,
InvalidPrincipalType,
NonEmptyGroup,
GroupIdNotFound,
FetchSessionIDNotFound,
InvalidFetchSessionEpoch,
ListenerNotFound,
TopicDeletionDisabled,
FencedLeaderEpoch,
UnknownLeaderEpoch,
UnsupportedCompressionType,
}
for _, err := range errorCodes {
t.Run(fmt.Sprintf("verify that error %d has a non-empty title, description, and error message", err), func(t *testing.T) {
if len(err.Title()) == 0 {
t.Error("empty title")
}
if len(err.Description()) == 0 {
t.Error("empty description")
}
if len(err.Error()) == 0 {
t.Error("empty error message")
}
})
}
t.Run("verify that an invalid error code has an empty title and description", func(t *testing.T) {
err := Error(-2)
if s := err.Title(); len(s) != 0 {
t.Error("non-empty title:", s)
}
if s := err.Description(); len(s) != 0 {
t.Error("non-empty description:", s)
}
})
t.Run("MessageTooLargeError error.Is satisfaction", func(t *testing.T) {
err := MessageSizeTooLarge
msg := []Message{
{Key: []byte("key"), Value: []byte("value")},
{Key: []byte("key"), Value: make([]byte, 8)},
}
msgTooLarge := messageTooLarge(msg, 1)
assert.NotErrorIs(t, err, msgTooLarge)
assert.Contains(t, msgTooLarge.Error(), MessageSizeTooLarge.Error())
assert.ErrorIs(t, msgTooLarge, MessageSizeTooLarge)
})
}
|