File: schedule.go

package info (click to toggle)
golang-github-canonicalltd-raft-test 0.0~git20180628.c3345b5-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 284 kB
  • sloc: makefile: 2
file content (178 lines) | stat: -rw-r--r-- 5,127 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Copyright 2017 Canonical Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package network

import (
	"sync"

	"github.com/CanonicalLtd/raft-test/internal/event"
	"github.com/hashicorp/raft"
)

// Schedule contains details about under when a certain event should occur.
type schedule struct {
	// List of peers that the event should occurr on.
	peers []raft.ServerID

	// The event should fire when the transport tries to append n'th
	// command log command in this term.
	n uint64

	// Event object that should be fired when all peers have been trying to
	// append the given command.
	event *event.Event

	// Track peers where the event already occurred.
	occurred []bool

	// If true, the event should occur after the command log has been
	// appended to all followers.
	append bool

	// Serialize access to internal state.
	mu sync.RWMutex
}

// Return a zero value fault that will never occurr.
func newSchedule() *schedule {
	return &schedule{}
}

// Add a server to the list of peers where the event should occurr.
func (s *schedule) AddPeer(id raft.ServerID) {
	s.peers = append(s.peers, id)
	s.occurred = append(s.occurred, false)
}

// Resets this fault to not occur.
func (s *schedule) NoEvent() {
	s.n = 0
	s.event = nil
	for i := range s.occurred {
		s.occurred[i] = false
	}
	s.append = false
}

// Configure this scheduler to fire the given event when the append entries RPC to
// apply the n'th command log has failed on all given peers.
func (s *schedule) EnqueueFailure(n uint64, event *event.Event) {
	s.n = n
	s.event = event
	for i := range s.occurred {
		s.occurred[i] = false
	}
}

// Configure this scheduler to fire the given event after the n'th command log has
// been appended by all peers but has a failed to be notified to all consumers.
func (s *schedule) AppendFailure(n uint64, event *event.Event) {
	s.n = n
	s.event = event
	for i := range s.occurred {
		s.occurred[i] = false
	}
	s.append = true
}

// FilterRequest scans the entries in the given append request, to see whether they
// contain the command log that this fault is supposed to trigger upon.
//
// The n parameter is the number of command logs successfully appended so far
// in the current term.
//
// It returns a request object and a boolean value.
//
// If the fault should not be triggered by this request, the returned request
// object is the same as the given one and the boolean value is false.
//
// If the fault should be be triggered by this request, the bolean value will
// be true and for the returned request object the are two cases:
//
// 1) If this is an enqueue fault, the returned request object will have its
//    Entries truncated to exclude the failing command log entry and every
//    entry beyond that. This way all logs preceeding the failing command log
//    will still be appended to the peer and the associated apply futures will
//    succeed, although the failing command log won't be applied and its apply
//    future will fail with ErrLeadershipLost.
//
// 1) If this is an append fault, the returned request object will be the same
//    as the given one. This way all logs willl be appended to the peer,
//    although the transport pretend that the append entries RPC has failed,
//    simulating a disconnection when delivering the RPC reply.
//
func (s *schedule) FilterRequest(n uint64, args *raft.AppendEntriesRequest) (*raft.AppendEntriesRequest, bool) {
	if s.n == 0 {
		return args, false
	}

	for i, log := range args.Entries {
		// Only consider command log entries.
		if log.Type != raft.LogCommand {
			continue
		}
		n++
		if n != s.n {
			continue
		}

		// We found a match.
		if !s.append {
			truncatedArgs := *args
			truncatedArgs.Entries = args.Entries[:i]
			args = &truncatedArgs
		}
		return args, true

	}
	return args, false
}

// Return the command log sequence number that should trigger this fault.
//
// For example if the fault was set to fail at the n'th command log appended
// during the term, the n is returned.
func (s *schedule) Command() uint64 {
	return s.n
}

// Return true if this is an enqueue fault.
func (s *schedule) IsEnqueueFault() bool {
	return !s.append
}

// Return true if this is an append fault.
func (s *schedule) IsAppendFault() bool {
	return s.append
}

// Mark the fault as occurred on the given server, and fire the event if it has
// occurred on all servers.
func (s *schedule) OccurredOn(id raft.ServerID) {
	s.mu.Lock()
	defer s.mu.Unlock()
	for i, other := range s.peers {
		if other == id {
			s.occurred[i] = true
		}
	}

	for _, flag := range s.occurred {
		if !flag {
			return
		}
	}
	s.event.Fire()
}