File: network.go

package info (click to toggle)
golang-github-canonicalltd-raft-test 0.0~git20180628.c3345b5-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 284 kB
  • sloc: makefile: 2
file content (147 lines) | stat: -rw-r--r-- 5,551 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright 2017 Canonical Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package network

import (
	"fmt"
	"log"

	"github.com/CanonicalLtd/raft-test/internal/event"
	"github.com/hashicorp/raft"
)

// Network provides control over all transports of a cluster, injecting
// disconnections and failures.
type Network struct {
	logger *log.Logger

	// Transport wrappers.
	transports map[raft.ServerID]*eventTransport
}

// New create a new network for controlling the underlying transports.
func New(logger *log.Logger) *Network {
	return &Network{
		logger:     logger,
		transports: make(map[raft.ServerID]*eventTransport),
	}
}

// Add a new transport to the network. Returns a transport that wraps the given
// transport with instrumentation to inject disconnections and failures.
func (n *Network) Add(id raft.ServerID, trans raft.Transport) raft.Transport {
	transport := newEventTransport(n.logger, id, trans)

	for _, other := range n.transports {
		transport.AddPeer(other)
		other.AddPeer(transport)
	}

	n.transports[id] = transport
	return transport
}

// Electing resets any leader-related state in the transport associated with
// given server ID (such as the track of logs appended by the peers), and it
// connects the transport to all its peers, enabling it to send them RPCs. It
// must be called whenever the server associated with this transport is about
// to transition to the leader state, and before any append entries RPC is
// made.
func (n *Network) Electing(id raft.ServerID) {
	n.logger.Printf("[DEBUG] raft-test: server %s: establish outbound connection to all other nodes", id)

	// Sanity check that the network is fully disconnected at this time.
	for id, transport := range n.transports {
		if transport.Connected() {
			panic(fmt.Sprintf("expected a fully disconected network, but server %s is connected", id))
		}
	}

	transport := n.transports[id]
	transport.Electing()
}

// Deposing disables connectivity from the transport of the server with the
// given ID to all its peers, allowing only append entries RPCs for peers that
// are lagging behind in terms of applied logs to be performed.
func (n *Network) Deposing(id raft.ServerID) {
	n.logger.Printf("[DEBUG] raft-test: server %s: dropping outbound connection to all other nodes", id)
	n.transports[id].Deposing()
}

// ConnectAllServers establishes full cluster connectivity after an
// election. The given ID is the one of the leader, which is already connected.
func (n *Network) ConnectAllServers(id raft.ServerID) {
	// Sanity check that the network is fully disconnected at this time.
	for other, transport := range n.transports {
		if other == id {
			continue
		}
		transport.peers.Connect()
	}
}

// Disconnect disables connectivity from the transport of the leader
// server with the given ID to the peer with the given ID.
func (n *Network) Disconnect(id, follower raft.ServerID) {
	n.logger.Printf("[DEBUG] raft-test: server %s: disconnecting follower %s", id, follower)
	n.transports[id].Disconnect(follower)
}

// Reconnect re-enables connectivity from the transport of the leader
// server with the given ID to the peer with the given ID.
func (n *Network) Reconnect(id, follower raft.ServerID) {
	n.logger.Printf("[DEBUG] raft-test: server %s: reconnecting follower %s", id, follower)
	n.transports[id].Reconnect(follower)
}

// PeerConnected returns whether the peer with the given server ID is connected
// with the transport of the server with the given ID.
func (n *Network) PeerConnected(id, peer raft.ServerID) bool {
	return n.transports[id].PeerConnected(peer)
}

// Address returns the address of the server with the given id.
func (n *Network) Address(id raft.ServerID) raft.ServerAddress {
	return n.transports[id].LocalAddr()
}

// HasAppendedLogsFromTo returns true if at least one log entry has been appended
// by server with id1 to server with id2.
//
// It is assumed that id1 is a leader that has just been elected and has been
// trying to append a noop log to all its followers.
func (n *Network) HasAppendedLogsFromTo(id1, id2 raft.ServerID) bool {
	transport := n.transports[id1]
	return transport.HasAppendedLogsTo(id2)
}

// ScheduleEnqueueFailure will make all followers of the given server fail when
// the leader tries to append the n'th log command. Return an event that will
// fire when all of them have failed and will block them all until
// acknowledged.
func (n *Network) ScheduleEnqueueFailure(id raft.ServerID, command uint64) *event.Event {
	transport := n.transports[id]
	return transport.ScheduleEnqueueFailure(command)
}

// ScheduleAppendFailure will make all followers of the given leader server
// append the n'th log command sent by the leader, but they will fail to
// acknowledge the leader about it. Return an event that will fire when all of
// them have failed and will block them all until acknowledged.
func (n *Network) ScheduleAppendFailure(id raft.ServerID, command uint64) *event.Event {
	transport := n.transports[id]
	return transport.ScheduleAppendFailure(command)
}