File: notifier.go

package info (click to toggle)
golang-github-canonicalltd-raft-test 0.0~git20180628.c3345b5-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 284 kB
  • sloc: makefile: 2
file content (149 lines) | stat: -rw-r--r-- 4,081 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// Copyright 2017 Canonical Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package election

import (
	"fmt"
	"log"
	"time"

	"github.com/hashicorp/raft"
)

// Notifiy about leadership changes in a single raft server.
type notifier struct {
	// For debugging raft-test itself or its consumers.
	logger *log.Logger

	// ID of the raft server we're observing.
	id raft.ServerID

	// Reference to the Config.NotifyCh object set in this server's Config.
	notifyCh chan bool

	// Channel used to tell the notification loop to expect the server to
	// acquire leadership. The leadership future sent to this channel will
	// be used both for notifying that leadership was acquired.
	futureCh chan *Future

	// Channel used to tell the notification loop to ignore any
	// notification received from the notifyCh.
	ignoreCh chan struct{}

	// Stop observing leadership changes when this channel gets closed.
	shutdownCh chan struct{}
}

// Create a new notifier.
func newNotifier(logger *log.Logger, id raft.ServerID, notifyCh chan bool) *notifier {
	observer := &notifier{
		logger:     logger,
		id:         id,
		notifyCh:   notifyCh,
		futureCh:   make(chan *Future),
		ignoreCh:   make(chan struct{}),
		shutdownCh: make(chan struct{}),
	}
	go observer.start()
	return observer
}

// Ignore any notifications received on the notifyCh.
func (n *notifier) Ignore() {
	close(n.ignoreCh)
}

// Close stops observing leadership changes.
func (n *notifier) Close() {
	n.shutdownCh <- struct{}{}
	<-n.shutdownCh
}

// Acquired returns a Leadership object when the server acquires leadership, or
// an error if the timeout expires.
//
// It must be called before this server has any chance to become leader
// (e.g. it's disconnected from the other servers).
//
// Once called, it must not be called again until leadership is lost.
func (n *notifier) Acquired(timeout time.Duration) *Future {
	future := newFuture(n.id, timeout)
	n.futureCh <- future
	return future
}

// Start observing leadership changes using the notify channel of our server
// and eed notification to our consumers.
//
// The loop will be terminated once the stopCh is closed.
func (n *notifier) start() {
	// Record the last leadership change observation. For asserting that a
	// leadership lost notification always follows a leadership acquired
	// one.
	var last bool

	// Record the last request for leadership change for this server, if
	// any.
	var future *Future
	for {
		select {
		case f := <-n.futureCh:
			if future != nil {
				panic(fmt.Sprintf("server %s: duplicate leadership request", n.id))
			}
			future = f
		case acquired := <-n.notifyCh:
			ignore := false
			select {
			case <-n.ignoreCh:
				// Just drop the notification on the floor.
				ignore = true
			default:
			}
			if ignore {
				break
			}
			if future == nil {
				panic(fmt.Sprintf("server %s: unexpected leadership change", n.id))
			}
			verb := ""
			var ch chan struct{}
			if acquired {
				verb = "acquired"
				ch = future.acquiredCh
			} else {
				verb = "lost"
				ch = future.lostCh
				future = nil

			}
			if acquired == last {
				panic(fmt.Sprintf("server %s %s leadership twice in a row", n.id, verb))
			}
			last = acquired
			n.logger.Printf("[DEBUG] raft-test: server %s: leadership: %s", n.id, verb)
			select {
			case <-ch:
				panic(fmt.Sprintf("server %s: duplicate leadership %s notification", n.id, verb))
			default:
				close(ch)
			}
		case <-n.shutdownCh:
			n.logger.Printf("[DEBUG] raft-test: server %s: leadership: stop watching", n.id)
			close(n.shutdownCh)
			return
		}
	}
}