File: server_java_test.go

package info (click to toggle)
golang-github-go-zookeeper-zk 1.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 320 kB
  • sloc: makefile: 43
file content (171 lines) | stat: -rw-r--r-- 5,384 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package zk

import (
	"context"
	"fmt"
	"io"
	"os"
	"os/exec"
	"path/filepath"
	"testing"
)

type ErrMissingServerConfigField string

func (e ErrMissingServerConfigField) Error() string {
	return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}

const (
	DefaultServerTickTime                 = 500
	DefaultServerInitLimit                = 10
	DefaultServerSyncLimit                = 5
	DefaultServerAutoPurgeSnapRetainCount = 3
	DefaultPeerPort                       = 2888
	DefaultLeaderElectionPort             = 3888
)

type server struct {
	stdout, stderr io.Writer
	cmdString      string
	cmdArgs        []string
	cmdEnv         []string
	cmd            *exec.Cmd
	// this cancel will kill the command being run in this case the server itself.
	cancelFunc context.CancelFunc
}

func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io.Writer) (*server, error) {
	// allow external systems to configure this zk server bin path.
	zkPath := os.Getenv("ZOOKEEPER_BIN_PATH")
	if zkPath == "" {
		// default to a static reletive path that can be setup with a build system
		zkPath = "zookeeper/bin"
	}
	if _, err := os.Stat(zkPath); err != nil {
		if os.IsNotExist(err) {
			return nil, fmt.Errorf("zk: could not find testing zookeeper bin path at %q: %v ", zkPath, err)
		}
	}
	// password is 'test'
	superString := `SERVER_JVMFLAGS=-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=`
	// enable TTL
	superString += ` -Dzookeeper.extendedTypesEnabled=true -Dzookeeper.emulate353TTLNodes=true`

	return &server{
		cmdString: filepath.Join(zkPath, "zkServer.sh"),
		cmdArgs:   []string{"start-foreground", configPath},
		cmdEnv:    []string{superString},
		stdout:    stdout, stderr: stderr,
	}, nil
}

func (srv *server) Start() error {
	ctx, cancel := context.WithCancel(context.Background())
	srv.cancelFunc = cancel

	srv.cmd = exec.CommandContext(ctx, srv.cmdString, srv.cmdArgs...)
	srv.cmd.Stdout = srv.stdout
	srv.cmd.Stderr = srv.stderr
	srv.cmd.Env = append(os.Environ(), srv.cmdEnv...)

	return srv.cmd.Start()
}

func (srv *server) Stop() error {
	srv.cancelFunc()
	return srv.cmd.Wait()
}

type ServerConfigServer struct {
	ID                 int
	Host               string
	PeerPort           int
	LeaderElectionPort int
}

type ServerConfig struct {
	TickTime                 int    // Number of milliseconds of each tick
	InitLimit                int    // Number of ticks that the initial synchronization phase can take
	SyncLimit                int    // Number of ticks that can pass between sending a request and getting an acknowledgement
	DataDir                  string // Direcrory where the snapshot is stored
	ClientPort               int    // Port at which clients will connect
	AutoPurgeSnapRetainCount int    // Number of snapshots to retain in dataDir
	AutoPurgePurgeInterval   int    // Purge task internal in hours (0 to disable auto purge)
	Servers                  []ServerConfigServer
}

func (sc ServerConfig) Marshall(w io.Writer) error {
	// the admin server is not wanted in test cases as it slows the startup process and is
	// of little unit test value.
	fmt.Fprintln(w, "admin.enableServer=false")
	if sc.DataDir == "" {
		return ErrMissingServerConfigField("dataDir")
	}
	fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
	if sc.TickTime <= 0 {
		sc.TickTime = DefaultServerTickTime
	}
	fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
	if sc.InitLimit <= 0 {
		sc.InitLimit = DefaultServerInitLimit
	}
	fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
	if sc.SyncLimit <= 0 {
		sc.SyncLimit = DefaultServerSyncLimit
	}
	fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
	if sc.ClientPort <= 0 {
		sc.ClientPort = DefaultPort
	}
	fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
	if sc.AutoPurgePurgeInterval > 0 {
		if sc.AutoPurgeSnapRetainCount <= 0 {
			sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
		}
		fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
		fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
	}
	// enable reconfig.
	// TODO: allow setting this
	fmt.Fprintln(w, "reconfigEnabled=true")
	fmt.Fprintln(w, "4lw.commands.whitelist=*")

	if len(sc.Servers) < 2 {
		// if we dont have more than 2 servers we just dont specify server list to start in standalone mode
		// see https://zookeeper.apache.org/doc/current/zookeeperStarted.html#sc_InstallingSingleMode for more details.
		return nil
	}
	// if we then have more than one server force it to be distributed
	fmt.Fprintln(w, "standaloneEnabled=false")

	for _, srv := range sc.Servers {
		if srv.PeerPort <= 0 {
			srv.PeerPort = DefaultPeerPort
		}
		if srv.LeaderElectionPort <= 0 {
			srv.LeaderElectionPort = DefaultLeaderElectionPort
		}
		fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.ID, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
	}
	return nil
}

// this is a helper to wait for the zk connection to at least get to the HasSession state
func waitForSession(ctx context.Context, eventChan <-chan Event) error {
	select {
	case event, ok := <-eventChan:
		// The eventChan is used solely to determine when the ZK conn has
		// stopped.
		if !ok {
			return fmt.Errorf("connection closed before state reached")
		}
		if event.State == StateHasSession {
			return nil
		}
	case <-ctx.Done():
		return ctx.Err()
	}

	return nil
}