1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
package sarama
import (
"hash"
"hash/crc32"
"hash/fnv"
"math/rand"
"time"
)
// Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1],
// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
// as simple default implementations.
type Partitioner interface {
// Partition takes a message and partition count and chooses a partition
Partition(message *ProducerMessage, numPartitions int32) (int32, error)
// RequiresConsistency indicates to the user of the partitioner whether the
// mapping of key->partition is consistent or not. Specifically, if a
// partitioner requires consistency then it must be allowed to choose from all
// partitions (even ones known to be unavailable), and its choice must be
// respected by the caller. The obvious example is the HashPartitioner.
RequiresConsistency() bool
}
// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
// in order to allow more flexibility than is originally allowed by the
// RequiresConsistency method in the Partitioner interface. This allows
// partitioners to require consistency sometimes, but not all times. It's useful
// for, e.g., the HashPartitioner, which does not require consistency if the
// message key is nil.
type DynamicConsistencyPartitioner interface {
Partitioner
// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
// but takes in the message being partitioned so that the partitioner can
// make a per-message determination.
MessageRequiresConsistency(message *ProducerMessage) bool
}
// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type PartitionerConstructor func(topic string) Partitioner
type manualPartitioner struct{}
// HashPartitionerOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)
// WithAbsFirst means that the partitioner handles absolute values
// in the same way as the reference Java implementation
func WithAbsFirst() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.referenceAbs = true
}
}
// WithHashUnsigned means the partitioner treats the hashed value as unsigned when
// partitioning. This is intended to be combined with the crc32 hash algorithm to
// be compatible with librdkafka's implementation
func WithHashUnsigned() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hashUnsigned = true
}
}
// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hasher = hasher()
}
}
// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.random = randomHP
}
}
// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
// ProducerMessage's Partition field as the partition to produce to.
func NewManualPartitioner(topic string) Partitioner {
return new(manualPartitioner)
}
func (p *manualPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
return message.Partition, nil
}
func (p *manualPartitioner) RequiresConsistency() bool {
return true
}
type randomPartitioner struct {
generator *rand.Rand
}
// NewRandomPartitioner returns a Partitioner which chooses a random partition each time.
func NewRandomPartitioner(topic string) Partitioner {
p := new(randomPartitioner)
p.generator = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
return p
}
func (p *randomPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
return int32(p.generator.Intn(int(numPartitions))), nil
}
func (p *randomPartitioner) RequiresConsistency() bool {
return false
}
type roundRobinPartitioner struct {
partition int32
}
// NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.
func NewRoundRobinPartitioner(topic string) Partitioner {
return &roundRobinPartitioner{}
}
func (p *roundRobinPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
if p.partition >= numPartitions {
p.partition = 0
}
ret := p.partition
p.partition++
return ret, nil
}
func (p *roundRobinPartitioner) RequiresConsistency() bool {
return false
}
type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
referenceAbs bool
hashUnsigned bool
}
// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
// The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that
// each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.
func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = hasher()
p.referenceAbs = false
p.hashUnsigned = false
return p
}
}
// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
for _, option := range options {
option(p)
}
return p
}
}
// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
// same partition.
func NewHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
return p
}
// NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values
// in the same way as the reference Java implementation. NewHashPartitioner was supposed to do
// that but it had a mistake and now there are people depending on both behaviors. This will
// all go away on the next major version bump.
func NewReferenceHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = true
p.hashUnsigned = false
return p
}
// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
// of the encoded bytes of the message key modulus the number of partitions. This is compatible with
// librdkafka's `consistent_random` partitioner
func NewConsistentCRCHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = crc32.NewIEEE()
p.referenceAbs = false
p.hashUnsigned = true
return p
}
func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int32) (int32, error) {
if message.Key == nil {
return p.random.Partition(message, numPartitions)
}
bytes, err := message.Key.Encode()
if err != nil {
return -1, err
}
p.hasher.Reset()
_, err = p.hasher.Write(bytes)
if err != nil {
return -1, err
}
var partition int32
// Turns out we were doing our absolute value in a subtly different way from the upstream
// implementation, but now we need to maintain backwards compat for people who started using
// the old version; if referenceAbs is set we are compatible with the reference java client
// but not past Sarama versions
if p.referenceAbs {
partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
} else if p.hashUnsigned {
// librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
// with librdkafka's `consistent` partitioning but not past Sarama versions
partition = int32(p.hasher.Sum32() % uint32(numPartitions))
} else {
partition = int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
partition = -partition
}
}
return partition, nil
}
func (p *hashPartitioner) RequiresConsistency() bool {
return true
}
func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
return message.Key != nil
}
|