1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
|
package integration
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"google.golang.org/grpc"
agentutils "github.com/docker/swarmkit/agent/testutils"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/node"
"github.com/docker/swarmkit/testutils"
)
// TestNode is representation of *agent.Node. It stores listeners, connections,
// config for later access from tests.
type testNode struct {
config *node.Config
node *node.Node
stateDir string
}
// generateCerts generates/overwrites TLS certificates for a node in a particular directory
func generateCerts(tmpDir string, rootCA *ca.RootCA, nodeID, role, org string, writeKey bool) error {
signer, err := rootCA.Signer()
if err != nil {
return err
}
certDir := filepath.Join(tmpDir, "certificates")
if err := os.MkdirAll(certDir, 0700); err != nil {
return err
}
certPaths := ca.NewConfigPaths(certDir)
if err := ioutil.WriteFile(certPaths.RootCA.Cert, signer.Cert, 0644); err != nil {
return err
}
if writeKey {
if err := ioutil.WriteFile(certPaths.RootCA.Key, signer.Key, 0600); err != nil {
return err
}
}
_, _, err = rootCA.IssueAndSaveNewCertificates(
ca.NewKeyReadWriter(certPaths.Node, nil, nil), nodeID, role, org)
return err
}
// newNode creates new node with specific role(manager or agent) and joins to
// existing cluster. if joinAddr is empty string, then new cluster will be initialized.
// It uses TestExecutor as executor. If lateBind is set, the remote API port is not
// bound. If rootCA is set, this root is used to bootstrap the node's TLS certs.
func newTestNode(joinAddr, joinToken string, lateBind bool, fips bool) (*testNode, error) {
tmpDir, err := ioutil.TempDir("", "swarmkit-integration-")
if err != nil {
return nil, err
}
cAddr := filepath.Join(tmpDir, "control.sock")
cfg := &node.Config{
ListenControlAPI: cAddr,
JoinAddr: joinAddr,
StateDir: tmpDir,
Executor: &agentutils.TestExecutor{},
JoinToken: joinToken,
FIPS: fips,
}
if !lateBind {
cfg.ListenRemoteAPI = "127.0.0.1:0"
}
node, err := node.New(cfg)
if err != nil {
return nil, err
}
return &testNode{
config: cfg,
node: node,
stateDir: tmpDir,
}, nil
}
// Pause stops the node, and creates a new swarm node while keeping all the state
func (n *testNode) Pause(forceNewCluster bool) error {
rAddr, err := n.node.RemoteAPIAddr()
if err != nil {
rAddr = "127.0.0.1:0"
}
if err := n.stop(); err != nil {
return err
}
cfg := n.config
cfg.ListenRemoteAPI = rAddr
// If JoinAddr is set, the node will connect to the join addr and ignore any
// other remotes that are stored in the raft directory.
cfg.JoinAddr = ""
cfg.JoinToken = ""
cfg.ForceNewCluster = forceNewCluster
node, err := node.New(cfg)
if err != nil {
return err
}
n.node = node
return nil
}
func (n *testNode) stop() error {
ctx, cancel := context.WithTimeout(context.Background(), opsTimeout)
defer cancel()
isManager := n.IsManager()
if err := n.node.Stop(ctx); err != nil {
// if the error is from trying to stop an already stopped stopped node, ignore the error
if strings.Contains(err.Error(), "node: not started") {
return nil
}
// TODO(aaronl): This stack dumping may be removed in the
// future once context deadline issues while shutting down
// nodes are resolved.
buf := make([]byte, 1024)
for {
n := runtime.Stack(buf, true)
if n < len(buf) {
buf = buf[:n]
break
}
buf = make([]byte, 2*len(buf))
}
os.Stderr.Write(buf)
if isManager {
return fmt.Errorf("error stop manager %s: %v", n.node.NodeID(), err)
}
return fmt.Errorf("error stop worker %s: %v", n.node.NodeID(), err)
}
return nil
}
// Stop stops the node and removes its state directory.
func (n *testNode) Stop() error {
if err := n.stop(); err != nil {
return err
}
return os.RemoveAll(n.stateDir)
}
// ControlClient returns grpc client to ControlAPI of node. It will panic for
// non-manager nodes.
func (n *testNode) ControlClient(ctx context.Context) (api.ControlClient, error) {
ctx, cancel := context.WithTimeout(ctx, opsTimeout)
defer cancel()
connChan := n.node.ListenControlSocket(ctx)
var controlConn *grpc.ClientConn
if err := testutils.PollFuncWithTimeout(nil, func() error {
select {
case controlConn = <-connChan:
default:
}
if controlConn == nil {
return fmt.Errorf("didn't get control api connection")
}
return nil
}, opsTimeout); err != nil {
return nil, err
}
return api.NewControlClient(controlConn), nil
}
func (n *testNode) IsManager() bool {
return n.node.Manager() != nil
}
|