1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
|
package integration
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"testing"
"time"
discoveryhttp "github.com/coreos/discovery.etcd.io/http"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/etcdserver/api/v3client"
)
// Service contains test discovery server components.
type Service struct {
rootCtx context.Context
rootCancel func()
cfg *embed.Config
dataDir string
etcdCURL url.URL
etcd *embed.Etcd
httpEp string
httpServer *http.Server
httpErrc chan error
}
const testDiscoveryHost = "handler-test"
// NewService creates a new service.
func NewService(t *testing.T, etcdClientPort, etcdPeerPort, httpPort int) *Service {
dataDir, err := ioutil.TempDir(os.TempDir(), "test-data")
if err != nil {
t.Fatal(err)
}
cfg := embed.NewConfig()
cfg.ClusterState = embed.ClusterStateFlagNew
cfg.Name = "test-etcd"
cfg.Dir = dataDir
curl := url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", etcdClientPort)}
cfg.ACUrls, cfg.LCUrls = []url.URL{curl}, []url.URL{curl}
purl := url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", etcdPeerPort)}
cfg.APUrls, cfg.LPUrls = []url.URL{purl}, []url.URL{purl}
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, cfg.APUrls[0].String())
cfg.SnapCount = 1000 // single-node, keep minimum snapshot
// TODO: enable this with etcd v3.3+
// cfg.AutoCompactionMode = compactor.ModePeriodic
// cfg.AutoCompactionRetention = 1
ctx, cancel := context.WithCancel(context.Background())
return &Service{
rootCtx: ctx,
rootCancel: cancel,
cfg: cfg,
dataDir: dataDir,
etcdCURL: curl,
httpEp: fmt.Sprintf("http://localhost:%d", httpPort),
httpServer: &http.Server{
Addr: fmt.Sprintf("localhost:%d", httpPort),
Handler: discoveryhttp.RegisterHandlers(ctx, cfg.LCUrls[0].String(), testDiscoveryHost),
},
httpErrc: make(chan error),
}
}
// Start starts etcd server and http listener.
func (sv *Service) Start(t *testing.T) <-chan error {
srv, err := embed.StartEtcd(sv.cfg)
if err != nil {
t.Fatal(err)
}
select {
case <-srv.Server.ReadyNotify():
err = nil
case err = <-srv.Err():
case <-srv.Server.StopNotify():
err = fmt.Errorf("received from etcdserver.Server.StopNotify")
}
if err != nil {
t.Fatal(err)
}
sv.etcd = srv
// issue linearized read to ensure leader election
cli := v3client.New(srv.Server)
_, err = cli.Get(context.Background(), "foo")
go func() {
defer close(sv.httpErrc)
if err := sv.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
sv.httpErrc <- err
return
}
sv.httpErrc <- nil
}()
return sv.httpErrc
}
// Stop stops etcd server, removing the data directory, and http server.
func (sv *Service) Stop(t *testing.T) {
defer os.RemoveAll(sv.dataDir)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := sv.httpServer.Shutdown(ctx)
cancel()
if err != nil && err != context.DeadlineExceeded {
t.Fatal(err)
}
err = <-sv.httpErrc
if err != nil {
t.Fatal(err)
}
sv.rootCancel()
sv.etcd.Close()
}
|