File: balance_strategy.go

package info (click to toggle)
golang-github-shopify-sarama 1.20.1-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,528 kB
  • sloc: sh: 106; makefile: 24
file content (129 lines) | stat: -rw-r--r-- 3,984 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package sarama

import (
	"math"
	"sort"
)

// BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt.
// It contains an allocation of topic/partitions by memberID in the form of
// a `memberID -> topic -> partitions` map.
type BalanceStrategyPlan map[string]map[string][]int32

// Add assigns a topic with a number partitions to a member.
func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) {
	if len(partitions) == 0 {
		return
	}
	if _, ok := p[memberID]; !ok {
		p[memberID] = make(map[string][]int32, 1)
	}
	p[memberID][topic] = append(p[memberID][topic], partitions...)
}

// --------------------------------------------------------------------

// BalanceStrategy is used to balance topics and partitions
// across memebers of a consumer group
type BalanceStrategy interface {
	// Name uniquely identifies the strategy.
	Name() string

	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
	// and returns a distribution plan.
	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)
}

// --------------------------------------------------------------------

// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
// Example with one topic T with six partitions (0..5) and two members (M1, M2):
//   M1: {T: [0, 1, 2]}
//   M2: {T: [3, 4, 5]}
var BalanceStrategyRange = &balanceStrategy{
	name: "range",
	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
		step := float64(len(partitions)) / float64(len(memberIDs))

		for i, memberID := range memberIDs {
			pos := float64(i)
			min := int(math.Floor(pos*step + 0.5))
			max := int(math.Floor((pos+1)*step + 0.5))
			plan.Add(memberID, topic, partitions[min:max]...)
		}
	},
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
//   M1: {T: [0, 2, 4]}
//   M2: {T: [1, 3, 5]}
var BalanceStrategyRoundRobin = &balanceStrategy{
	name: "roundrobin",
	coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
		for i, part := range partitions {
			memberID := memberIDs[i%len(memberIDs)]
			plan.Add(memberID, topic, part)
		}
	},
}

// --------------------------------------------------------------------

type balanceStrategy struct {
	name   string
	coreFn func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32)
}

// Name implements BalanceStrategy.
func (s *balanceStrategy) Name() string { return s.name }

// Balance implements BalanceStrategy.
func (s *balanceStrategy) Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
	// Build members by topic map
	mbt := make(map[string][]string)
	for memberID, meta := range members {
		for _, topic := range meta.Topics {
			mbt[topic] = append(mbt[topic], memberID)
		}
	}

	// Sort members for each topic
	for topic, memberIDs := range mbt {
		sort.Sort(&balanceStrategySortable{
			topic:     topic,
			memberIDs: memberIDs,
		})
	}

	// Assemble plan
	plan := make(BalanceStrategyPlan, len(members))
	for topic, memberIDs := range mbt {
		s.coreFn(plan, memberIDs, topic, topics[topic])
	}
	return plan, nil
}

type balanceStrategySortable struct {
	topic     string
	memberIDs []string
}

func (p balanceStrategySortable) Len() int { return len(p.memberIDs) }
func (p balanceStrategySortable) Swap(i, j int) {
	p.memberIDs[i], p.memberIDs[j] = p.memberIDs[j], p.memberIDs[i]
}
func (p balanceStrategySortable) Less(i, j int) bool {
	return balanceStrategyHashValue(p.topic, p.memberIDs[i]) < balanceStrategyHashValue(p.topic, p.memberIDs[j])
}

func balanceStrategyHashValue(vv ...string) uint32 {
	h := uint32(2166136261)
	for _, s := range vv {
		for _, c := range s {
			h ^= uint32(c)
			h *= 16777619
		}
	}
	return h
}