File: discovery.go

package info (click to toggle)
golang-github-coreos-discovery-etcd-io 2.0.0%2Bgit2019.04.19.git.78fb45d3c9-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 23,460 kB
  • sloc: asm: 452; sh: 94; ansic: 24; makefile: 16
file content (140 lines) | stat: -rw-r--r-- 2,997 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package e2e

import (
	"fmt"
	"net/http"
	"strings"
	"sync/atomic"
	"time"

	"github.com/coreos/etcd/pkg/expect"
)

type discoveryProcessConfig struct {
	execPath      string
	etcdEp        string
	discoveryHost string
	webPort       int
}

type discoveryProcess struct {
	cfg   *discoveryProcessConfig
	proc  *expect.ExpectProcess
	donec chan struct{} // closed when Interact() terminates
}

var discoveryBasePort int32 = 30000

// NewDiscoveryProcess creates a new 'discoveryProcess'.
func (cfg *discoveryProcessConfig) NewDiscoveryProcess() *discoveryProcess {
	port := int(atomic.LoadInt32(&discoveryBasePort))
	atomic.AddInt32(&discoveryBasePort, 2)

	copied := *cfg
	copied.webPort = port
	copied.discoveryHost = fmt.Sprintf("http://localhost:%d", copied.webPort)

	return &discoveryProcess{cfg: &copied}
}

func (dp *discoveryProcess) Stop(d time.Duration) (err error) {
	errc := make(chan error, 1)
	go func() { errc <- dp.proc.Stop() }()
	select {
	case err := <-errc:
		return err
	case <-time.After(d):
		return fmt.Errorf("took longer than %v to Stop cluster", d)
	}
}

func (dp *discoveryProcess) Start() error {
	args := []string{
		dp.cfg.execPath,
		"--etcd", dp.cfg.etcdEp,
		"--host", dp.cfg.discoveryHost,
		"--addr", fmt.Sprintf(":%d", dp.cfg.webPort),
	}
	child, err := expect.NewExpect(args[0], args[1:]...)
	if err != nil {
		return err
	}
	dp.proc = child
	dp.donec = make(chan struct{})

	readyC := make(chan error)
	go func() {
		readyC <- dp.waitReady()
	}()
	select {
	case err = <-readyC:
		if err != nil {
			return err
		}
	case <-time.After(10 * time.Second):
		return fmt.Errorf("timed out waiting for discover server")
	}
	return nil
}

func (dp *discoveryProcess) waitReady() error {
	defer close(dp.donec)
	return waitReadyExpectProcDiscovery(dp.proc)
}

func waitReadyExpectProcDiscovery(exproc *expect.ExpectProcess) error {
	readyStrs := []string{"discovery server started", "discovery serving on"}
	c := 0
	matchSet := func(l string) bool {
		for _, s := range readyStrs {
			if strings.Contains(l, s) {
				c++
				break
			}
		}
		return c == len(readyStrs)
	}
	_, err := exproc.ExpectFunc(matchSet)
	return err
}

type cURLReq struct {
	timeout  time.Duration
	endpoint string
	method   string
	data     string
	expFunc  func(txt string) bool
}

func (req cURLReq) Send() (string, error) {
	cmdArgs := []string{"curl"}
	if req.timeout > 0 {
		cmdArgs = append(cmdArgs, "--max-time", fmt.Sprint(int(req.timeout.Seconds())))
	}
	cmdArgs = append(cmdArgs, "-L", req.endpoint)
	switch req.method {
	case http.MethodPost, http.MethodPut:
		dt := req.data
		if !strings.HasPrefix(dt, "{") { // for non-JSON value
			dt = "value=" + dt
		}
		cmdArgs = append(cmdArgs, "-X", req.method, "-d", dt)
	}

	proc, err := expect.NewExpect(cmdArgs[0], cmdArgs[1:]...)
	if err != nil {
		return "", err
	}

	var line string
	for {
		line, err = proc.ExpectFunc(req.expFunc)
		if err != nil {
			proc.Close()
			return "", err
		}
		break
	}
	return line, proc.Close()

}