1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
package etcdv3
import (
"context"
"crypto/tls"
"errors"
"time"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
)
var (
// ErrNoKey indicates a client method needs a key but receives none.
ErrNoKey = errors.New("no key provided")
// ErrNoValue indicates a client method needs a value but receives none.
ErrNoValue = errors.New("no value provided")
)
// Client is a wrapper around the etcd client.
type Client interface {
// GetEntries queries the given prefix in etcd and returns a slice
// containing the values of all keys found, recursively, underneath that
// prefix.
GetEntries(prefix string) ([]string, error)
// WatchPrefix watches the given prefix in etcd for changes. When a change
// is detected, it will signal on the passed channel. Clients are expected
// to call GetEntries to update themselves with the latest set of complete
// values. WatchPrefix will always send an initial sentinel value on the
// channel after establishing the watch, to ensure that clients always
// receive the latest set of values. WatchPrefix will block until the
// context passed to the NewClient constructor is terminated.
WatchPrefix(prefix string, ch chan struct{})
// Register a service with etcd.
Register(s Service) error
// Deregister a service with etcd.
Deregister(s Service) error
// LeaseID returns the lease id created for this service instance
LeaseID() int64
}
type client struct {
cli *clientv3.Client
ctx context.Context
kv clientv3.KV
// Watcher interface instance, used to leverage Watcher.Close()
watcher clientv3.Watcher
// watcher context
wctx context.Context
// watcher cancel func
wcf context.CancelFunc
// leaseID will be 0 (clientv3.NoLease) if a lease was not created
leaseID clientv3.LeaseID
hbch <-chan *clientv3.LeaseKeepAliveResponse
// Lease interface instance, used to leverage Lease.Close()
leaser clientv3.Lease
}
// ClientOptions defines options for the etcd client. All values are optional.
// If any duration is not specified, a default of 3 seconds will be used.
type ClientOptions struct {
Cert string
Key string
CACert string
DialTimeout time.Duration
DialKeepAlive time.Duration
// DialOptions is a list of dial options for the gRPC client (e.g., for interceptors).
// For example, pass grpc.WithBlock() to block until the underlying connection is up.
// Without this, Dial returns immediately and connecting the server happens in background.
DialOptions []grpc.DialOption
Username string
Password string
}
// NewClient returns Client with a connection to the named machines. It will
// return an error if a connection to the cluster cannot be made.
func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
if options.DialTimeout == 0 {
options.DialTimeout = 3 * time.Second
}
if options.DialKeepAlive == 0 {
options.DialKeepAlive = 3 * time.Second
}
var err error
var tlscfg *tls.Config
if options.Cert != "" && options.Key != "" {
tlsInfo := transport.TLSInfo{
CertFile: options.Cert,
KeyFile: options.Key,
TrustedCAFile: options.CACert,
}
tlscfg, err = tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
}
cli, err := clientv3.New(clientv3.Config{
Context: ctx,
Endpoints: machines,
DialTimeout: options.DialTimeout,
DialKeepAliveTime: options.DialKeepAlive,
DialOptions: options.DialOptions,
TLS: tlscfg,
Username: options.Username,
Password: options.Password,
})
if err != nil {
return nil, err
}
return &client{
cli: cli,
ctx: ctx,
kv: clientv3.NewKV(cli),
}, nil
}
func (c *client) LeaseID() int64 { return int64(c.leaseID) }
// GetEntries implements the etcd Client interface.
func (c *client) GetEntries(key string) ([]string, error) {
resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}
entries := make([]string, len(resp.Kvs))
for i, kv := range resp.Kvs {
entries[i] = string(kv.Value)
}
return entries, nil
}
// WatchPrefix implements the etcd Client interface.
func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
c.wctx, c.wcf = context.WithCancel(c.ctx)
c.watcher = clientv3.NewWatcher(c.cli)
wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
ch <- struct{}{}
for wr := range wch {
if wr.Canceled {
return
}
ch <- struct{}{}
}
}
func (c *client) Register(s Service) error {
var err error
if s.Key == "" {
return ErrNoKey
}
if s.Value == "" {
return ErrNoValue
}
if c.leaser != nil {
c.leaser.Close()
}
c.leaser = clientv3.NewLease(c.cli)
if c.watcher != nil {
c.watcher.Close()
}
c.watcher = clientv3.NewWatcher(c.cli)
if c.kv == nil {
c.kv = clientv3.NewKV(c.cli)
}
if s.TTL == nil {
s.TTL = NewTTLOption(time.Second*3, time.Second*10)
}
grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds()))
if err != nil {
return err
}
c.leaseID = grantResp.ID
_, err = c.kv.Put(
c.ctx,
s.Key,
s.Value,
clientv3.WithLease(c.leaseID),
)
if err != nil {
return err
}
// this will keep the key alive 'forever' or until we revoke it or
// the context is canceled
c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID)
if err != nil {
return err
}
// discard the keepalive response, make etcd library not to complain
// fix bug #799
go func() {
for {
select {
case r := <-c.hbch:
// avoid dead loop when channel was closed
if r == nil {
return
}
case <-c.ctx.Done():
return
}
}
}()
return nil
}
func (c *client) Deregister(s Service) error {
defer c.close()
if s.Key == "" {
return ErrNoKey
}
if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil {
return err
}
return nil
}
// close will close any open clients and call
// the watcher cancel func
func (c *client) close() {
if c.leaser != nil {
c.leaser.Close()
}
if c.watcher != nil {
c.watcher.Close()
}
if c.wcf != nil {
c.wcf()
}
}
|