File: client.go

package info (click to toggle)
golang-github-go-kit-kit 0.6.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, experimental, sid
  • size: 4,848 kB
  • sloc: sh: 65; makefile: 14
file content (273 lines) | stat: -rw-r--r-- 7,507 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package zk

import (
	"errors"
	"net"
	"strings"
	"time"

	"github.com/samuel/go-zookeeper/zk"

	"github.com/go-kit/kit/log"
)

// DefaultACL is the default ACL to use for creating znodes.
var (
	DefaultACL            = zk.WorldACL(zk.PermAll)
	ErrInvalidCredentials = errors.New("invalid credentials provided")
	ErrClientClosed       = errors.New("client service closed")
	ErrNotRegistered      = errors.New("not registered")
	ErrNodeNotFound       = errors.New("node not found")
)

const (
	// DefaultConnectTimeout is the default timeout to establish a connection to
	// a ZooKeeper node.
	DefaultConnectTimeout = 2 * time.Second
	// DefaultSessionTimeout is the default timeout to keep the current
	// ZooKeeper session alive during a temporary disconnect.
	DefaultSessionTimeout = 5 * time.Second
)

// Client is a wrapper around a lower level ZooKeeper client implementation.
type Client interface {
	// GetEntries should query the provided path in ZooKeeper, place a watch on
	// it and retrieve data from its current child nodes.
	GetEntries(path string) ([]string, <-chan zk.Event, error)
	// CreateParentNodes should try to create the path in case it does not exist
	// yet on ZooKeeper.
	CreateParentNodes(path string) error
	// Register a service with ZooKeeper.
	Register(s *Service) error
	// Deregister a service with ZooKeeper.
	Deregister(s *Service) error
	// Stop should properly shutdown the client implementation
	Stop()
}

type clientConfig struct {
	logger          log.Logger
	acl             []zk.ACL
	credentials     []byte
	connectTimeout  time.Duration
	sessionTimeout  time.Duration
	rootNodePayload [][]byte
	eventHandler    func(zk.Event)
}

// Option functions enable friendly APIs.
type Option func(*clientConfig) error

type client struct {
	*zk.Conn
	clientConfig
	active bool
	quit   chan struct{}
}

// ACL returns an Option specifying a non-default ACL for creating parent nodes.
func ACL(acl []zk.ACL) Option {
	return func(c *clientConfig) error {
		c.acl = acl
		return nil
	}
}

// Credentials returns an Option specifying a user/password combination which
// the client will use to authenticate itself with.
func Credentials(user, pass string) Option {
	return func(c *clientConfig) error {
		if user == "" || pass == "" {
			return ErrInvalidCredentials
		}
		c.credentials = []byte(user + ":" + pass)
		return nil
	}
}

// ConnectTimeout returns an Option specifying a non-default connection timeout
// when we try to establish a connection to a ZooKeeper server.
func ConnectTimeout(t time.Duration) Option {
	return func(c *clientConfig) error {
		if t.Seconds() < 1 {
			return errors.New("invalid connect timeout (minimum value is 1 second)")
		}
		c.connectTimeout = t
		return nil
	}
}

// SessionTimeout returns an Option specifying a non-default session timeout.
func SessionTimeout(t time.Duration) Option {
	return func(c *clientConfig) error {
		if t.Seconds() < 1 {
			return errors.New("invalid session timeout (minimum value is 1 second)")
		}
		c.sessionTimeout = t
		return nil
	}
}

// Payload returns an Option specifying non-default data values for each znode
// created by CreateParentNodes.
func Payload(payload [][]byte) Option {
	return func(c *clientConfig) error {
		c.rootNodePayload = payload
		return nil
	}
}

// EventHandler returns an Option specifying a callback function to handle
// incoming zk.Event payloads (ZooKeeper connection events).
func EventHandler(handler func(zk.Event)) Option {
	return func(c *clientConfig) error {
		c.eventHandler = handler
		return nil
	}
}

// NewClient returns a ZooKeeper client with a connection to the server cluster.
// It will return an error if the server cluster cannot be resolved.
func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
	defaultEventHandler := func(event zk.Event) {
		logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
	}
	config := clientConfig{
		acl:            DefaultACL,
		connectTimeout: DefaultConnectTimeout,
		sessionTimeout: DefaultSessionTimeout,
		eventHandler:   defaultEventHandler,
		logger:         logger,
	}
	for _, option := range options {
		if err := option(&config); err != nil {
			return nil, err
		}
	}
	// dialer overrides the default ZooKeeper library Dialer so we can configure
	// the connectTimeout. The current library has a hardcoded value of 1 second
	// and there are reports of race conditions, due to slow DNS resolvers and
	// other network latency issues.
	dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
		return net.DialTimeout(network, address, config.connectTimeout)
	}
	conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))

	if err != nil {
		return nil, err
	}

	if len(config.credentials) > 0 {
		err = conn.AddAuth("digest", config.credentials)
		if err != nil {
			return nil, err
		}
	}

	c := &client{conn, config, true, make(chan struct{})}

	// Start listening for incoming Event payloads and callback the set
	// eventHandler.
	go func() {
		for {
			select {
			case event := <-eventc:
				config.eventHandler(event)
			case <-c.quit:
				return
			}
		}
	}()
	return c, nil
}

// CreateParentNodes implements the ZooKeeper Client interface.
func (c *client) CreateParentNodes(path string) error {
	if !c.active {
		return ErrClientClosed
	}
	if path[0] != '/' {
		return zk.ErrInvalidPath
	}
	payload := []byte("")
	pathString := ""
	pathNodes := strings.Split(path, "/")
	for i := 1; i < len(pathNodes); i++ {
		if i <= len(c.rootNodePayload) {
			payload = c.rootNodePayload[i-1]
		} else {
			payload = []byte("")
		}
		pathString += "/" + pathNodes[i]
		_, err := c.Create(pathString, payload, 0, c.acl)
		// not being able to create the node because it exists or not having
		// sufficient rights is not an issue. It is ok for the node to already
		// exist and/or us to only have read rights
		if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
			return err
		}
	}
	return nil
}

// GetEntries implements the ZooKeeper Client interface.
func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
	// retrieve list of child nodes for given path and add watch to path
	znodes, _, eventc, err := c.ChildrenW(path)

	if err != nil {
		return nil, eventc, err
	}

	var resp []string
	for _, znode := range znodes {
		// retrieve payload for child znode and add to response array
		if data, _, err := c.Get(path + "/" + znode); err == nil {
			resp = append(resp, string(data))
		}
	}
	return resp, eventc, nil
}

// Register implements the ZooKeeper Client interface.
func (c *client) Register(s *Service) error {
	if s.Path[len(s.Path)-1] != '/' {
		s.Path += "/"
	}
	path := s.Path + s.Name
	if err := c.CreateParentNodes(path); err != nil {
		return err
	}
	node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
	if err != nil {
		return err
	}
	s.node = node
	return nil
}

// Deregister implements the ZooKeeper Client interface.
func (c *client) Deregister(s *Service) error {
	if s.node == "" {
		return ErrNotRegistered
	}
	path := s.Path + s.Name
	found, stat, err := c.Exists(path)
	if err != nil {
		return err
	}
	if !found {
		return ErrNodeNotFound
	}
	if err := c.Delete(path, stat.Version); err != nil {
		return err
	}
	return nil
}

// Stop implements the ZooKeeper Client interface.
func (c *client) Stop() {
	c.active = false
	close(c.quit)
	c.Close()
}