| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 
 | package sarama
import (
	"hash"
	"hash/crc32"
	"hash/fnv"
	"math/rand"
	"time"
)
// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
// as simple default implementations.
type Partitioner interface {
	// Partition takes a message and partition count and chooses a partition
	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
	// RequiresConsistency indicates to the user of the partitioner whether the
	// mapping of key->partition is consistent or not. Specifically, if a
	// partitioner requires consistency then it must be allowed to choose from all
	// partitions (even ones known to be unavailable), and its choice must be
	// respected by the caller. The obvious example is the HashPartitioner.
	RequiresConsistency() bool
}
// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
// in order to allow more flexibility than is originally allowed by the
// RequiresConsistency method in the Partitioner interface. This allows
// partitioners to require consistency sometimes, but not all times. It's useful
// for, e.g., the HashPartitioner, which does not require consistency if the
// message key is nil.
type DynamicConsistencyPartitioner interface {
	Partitioner
	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
	// but takes in the message being partitioned so that the partitioner can
	// make a per-message determination.
	MessageRequiresConsistency(message *ProducerMessage) bool
}
// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type PartitionerConstructor func(topic string) Partitioner
type manualPartitioner struct{}
// HashPartitionerOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)
// WithAbsFirst means that the partitioner handles absolute values
// in the same way as the reference Java implementation
func WithAbsFirst() HashPartitionerOption {
	return func(hp *hashPartitioner) {
		hp.referenceAbs = true
	}
}
// WithHashUnsigned means the partitioner treats the hashed value as unsigned when
// partitioning.  This is intended to be combined with the crc32 hash algorithm to
// be compatible with librdkafka's implementation
func WithHashUnsigned() HashPartitionerOption {
	return func(hp *hashPartitioner) {
		hp.hashUnsigned = true
	}
}
// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
	return func(hp *hashPartitioner) {
		hp.hasher = hasher()
	}
}
// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
	return func(hp *hashPartitioner) {
		hp.random = randomHP
	}
}
// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
// ProducerMessage's Partition field as the partition to produce to.
func NewManualPartitioner(topic string) Partitioner {
	return new(manualPartitioner)
}
func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
	return message.Partition, nil
}
func (p *manualPartitioner) RequiresConsistency() bool {
	return true
}
type randomPartitioner struct {
	generator *rand.Rand
}
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
func NewRandomPartitioner(topic string) Partitioner {
	p := new(randomPartitioner)
	p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
	return p
}
func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
	return int32(p.generator.Intn(int(numPartitions))), nil
}
func (p *randomPartitioner) RequiresConsistency() bool {
	return false
}
type roundRobinPartitioner struct {
	partition int32
}
// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
func NewRoundRobinPartitioner(topic string) Partitioner {
	return &roundRobinPartitioner{}
}
func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
	if p.partition >= numPartitions {
		p.partition = 0
	}
	ret := p.partition
	p.partition++
	return ret, nil
}
func (p *roundRobinPartitioner) RequiresConsistency() bool {
	return false
}
type hashPartitioner struct {
	random       Partitioner
	hasher       hash.Hash32
	referenceAbs bool
	hashUnsigned bool
}
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
	return func(topic string) Partitioner {
		p := new(hashPartitioner)
		p.random = NewRandomPartitioner(topic)
		p.hasher = hasher()
		p.referenceAbs = false
		p.hashUnsigned = false
		return p
	}
}
// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
	return func(topic string) Partitioner {
		p := new(hashPartitioner)
		p.random = NewRandomPartitioner(topic)
		p.hasher = fnv.New32a()
		p.referenceAbs = false
		p.hashUnsigned = false
		for _, option := range options {
			option(p)
		}
		return p
	}
}
// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
// same partition.
func NewHashPartitioner(topic string) Partitioner {
	p := new(hashPartitioner)
	p.random = NewRandomPartitioner(topic)
	p.hasher = fnv.New32a()
	p.referenceAbs = false
	p.hashUnsigned = false
	return p
}
// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
// that but it had a mistake and now there are people depending on both behaviors. This will
// all go away on the next major version bump.
func NewReferenceHashPartitioner(topic string) Partitioner {
	p := new(hashPartitioner)
	p.random = NewRandomPartitioner(topic)
	p.hasher = fnv.New32a()
	p.referenceAbs = true
	p.hashUnsigned = false
	return p
}
// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
// of the encoded bytes of the message key modulus the number of partitions.  This is compatible with
// librdkafka's `consistent_random` partitioner
func NewConsistentCRCHashPartitioner(topic string) Partitioner {
	p := new(hashPartitioner)
	p.random = NewRandomPartitioner(topic)
	p.hasher = crc32.NewIEEE()
	p.referenceAbs = false
	p.hashUnsigned = true
	return p
}
func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
	if message.Key == nil {
		return p.random.Partition(message, numPartitions)
	}
	bytes, err := message.Key.Encode()
	if err != nil {
		return -1, err
	}
	p.hasher.Reset()
	_, err = p.hasher.Write(bytes)
	if err != nil {
		return -1, err
	}
	var partition int32
	// Turns out we were doing our absolute value in a subtly different way from the upstream
	// implementation, but now we need to maintain backwards compat for people who started using
	// the old version; if referenceAbs is set we are compatible with the reference java client
	// but not past Sarama versions
	if p.referenceAbs {
		partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
	} else if p.hashUnsigned {
		// librdkafka treats the hashed value as unsigned.  If `hashUnsigned` is set we are compatible
		// with librdkafka's `consistent` partitioning but not past Sarama versions
		partition = int32(p.hasher.Sum32() % uint32(numPartitions))
	} else {
		partition = int32(p.hasher.Sum32()) % numPartitions
		if partition < 0 {
			partition = -partition
		}
	}
	return partition, nil
}
func (p *hashPartitioner) RequiresConsistency() bool {
	return true
}
func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
	return message.Key != nil
}
 |