File: candidate.go

package info (click to toggle)
golang-github-docker-leadership 0.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 96 kB
  • ctags: 22
  • sloc: makefile: 2
file content (150 lines) | stat: -rw-r--r-- 3,077 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package leadership

import (
	"sync"
	"time"

	"github.com/docker/libkv/store"
)

const (
	defaultLockTTL = 20 * time.Second
)

// Candidate runs the leader election algorithm asynchronously
type Candidate struct {
	client store.Store
	key    string
	node   string

	electedCh chan bool
	lock      sync.Mutex
	lockTTL   time.Duration
	leader    bool
	stopCh    chan struct{}
	stopRenew chan struct{}
	resignCh  chan bool
	errCh     chan error
}

// NewCandidate creates a new Candidate
func NewCandidate(client store.Store, key, node string, ttl time.Duration) *Candidate {
	return &Candidate{
		client: client,
		key:    key,
		node:   node,

		leader:   false,
		lockTTL:  ttl,
		resignCh: make(chan bool),
		stopCh:   make(chan struct{}),
	}
}

// IsLeader returns true if the candidate is currently a leader.
func (c *Candidate) IsLeader() bool {
	return c.leader
}

// RunForElection starts the leader election algorithm. Updates in status are
// pushed through the ElectedCh channel.
//
// ElectedCh is used to get a channel which delivers signals on
// acquiring or losing leadership. It sends true if we become
// the leader, and false if we lose it.
func (c *Candidate) RunForElection() (<-chan bool, <-chan error) {
	c.electedCh = make(chan bool)
	c.errCh = make(chan error)

	go c.campaign()

	return c.electedCh, c.errCh
}

// Stop running for election.
func (c *Candidate) Stop() {
	close(c.stopCh)
}

// Resign forces the candidate to step-down and try again.
// If the candidate is not a leader, it doesn't have any effect.
// Candidate will retry immediately to acquire the leadership. If no-one else
// took it, then the Candidate will end up being a leader again.
func (c *Candidate) Resign() {
	c.lock.Lock()
	defer c.lock.Unlock()

	if c.leader {
		c.resignCh <- true
	}
}

func (c *Candidate) update(status bool) {
	c.lock.Lock()
	defer c.lock.Unlock()

	c.leader = status
	c.electedCh <- status
}

func (c *Candidate) initLock() (store.Locker, error) {
	// Give up on the lock session if
	// we recovered from a store failure
	if c.stopRenew != nil {
		close(c.stopRenew)
	}

	lockOpts := &store.LockOptions{
		Value: []byte(c.node),
	}

	if c.lockTTL != defaultLockTTL {
		lockOpts.TTL = c.lockTTL
	}

	lockOpts.RenewLock = make(chan struct{})
	c.stopRenew = lockOpts.RenewLock

	lock, err := c.client.NewLock(c.key, lockOpts)
	return lock, err
}

func (c *Candidate) campaign() {
	defer close(c.electedCh)
	defer close(c.errCh)

	for {
		// Start as a follower.
		c.update(false)

		lock, err := c.initLock()
		if err != nil {
			c.errCh <- err
			return
		}

		lostCh, err := lock.Lock(nil)
		if err != nil {
			c.errCh <- err
			return
		}

		// Hooray! We acquired the lock therefore we are the new leader.
		c.update(true)

		select {
		case <-c.resignCh:
			// We were asked to resign, give up the lock and go back
			// campaigning.
			lock.Unlock()
		case <-c.stopCh:
			// Give up the leadership and quit.
			if c.leader {
				lock.Unlock()
			}
			return
		case <-lostCh:
			// We lost the lock. Someone else is the leader, try again.
		}
	}
}