1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
|
package zk
import (
"errors"
"net"
"strings"
"time"
"github.com/go-zookeeper/zk"
"github.com/go-kit/log"
)
// DefaultACL is the default ACL to use for creating znodes.
var (
DefaultACL = zk.WorldACL(zk.PermAll)
ErrInvalidCredentials = errors.New("invalid credentials provided")
ErrClientClosed = errors.New("client service closed")
ErrNotRegistered = errors.New("not registered")
ErrNodeNotFound = errors.New("node not found")
)
const (
// DefaultConnectTimeout is the default timeout to establish a connection to
// a ZooKeeper node.
DefaultConnectTimeout = 2 * time.Second
// DefaultSessionTimeout is the default timeout to keep the current
// ZooKeeper session alive during a temporary disconnect.
DefaultSessionTimeout = 5 * time.Second
)
// Client is a wrapper around a lower level ZooKeeper client implementation.
type Client interface {
// GetEntries should query the provided path in ZooKeeper, place a watch on
// it and retrieve data from its current child nodes.
GetEntries(path string) ([]string, <-chan zk.Event, error)
// CreateParentNodes should try to create the path in case it does not exist
// yet on ZooKeeper.
CreateParentNodes(path string) error
// Register a service with ZooKeeper.
Register(s *Service) error
// Deregister a service with ZooKeeper.
Deregister(s *Service) error
// Stop should properly shutdown the client implementation
Stop()
}
type clientConfig struct {
logger log.Logger
acl []zk.ACL
credentials []byte
connectTimeout time.Duration
sessionTimeout time.Duration
rootNodePayload [][]byte
eventHandler func(zk.Event)
}
// Option functions enable friendly APIs.
type Option func(*clientConfig) error
type client struct {
*zk.Conn
clientConfig
active bool
quit chan struct{}
}
// ACL returns an Option specifying a non-default ACL for creating parent nodes.
func ACL(acl []zk.ACL) Option {
return func(c *clientConfig) error {
c.acl = acl
return nil
}
}
// Credentials returns an Option specifying a user/password combination which
// the client will use to authenticate itself with.
func Credentials(user, pass string) Option {
return func(c *clientConfig) error {
if user == "" || pass == "" {
return ErrInvalidCredentials
}
c.credentials = []byte(user + ":" + pass)
return nil
}
}
// ConnectTimeout returns an Option specifying a non-default connection timeout
// when we try to establish a connection to a ZooKeeper server.
func ConnectTimeout(t time.Duration) Option {
return func(c *clientConfig) error {
if t.Seconds() < 1 {
return errors.New("invalid connect timeout (minimum value is 1 second)")
}
c.connectTimeout = t
return nil
}
}
// SessionTimeout returns an Option specifying a non-default session timeout.
func SessionTimeout(t time.Duration) Option {
return func(c *clientConfig) error {
if t.Seconds() < 1 {
return errors.New("invalid session timeout (minimum value is 1 second)")
}
c.sessionTimeout = t
return nil
}
}
// Payload returns an Option specifying non-default data values for each znode
// created by CreateParentNodes.
func Payload(payload [][]byte) Option {
return func(c *clientConfig) error {
c.rootNodePayload = payload
return nil
}
}
// EventHandler returns an Option specifying a callback function to handle
// incoming zk.Event payloads (ZooKeeper connection events).
func EventHandler(handler func(zk.Event)) Option {
return func(c *clientConfig) error {
c.eventHandler = handler
return nil
}
}
// NewClient returns a ZooKeeper client with a connection to the server cluster.
// It will return an error if the server cluster cannot be resolved.
func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
defaultEventHandler := func(event zk.Event) {
logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
}
config := clientConfig{
acl: DefaultACL,
connectTimeout: DefaultConnectTimeout,
sessionTimeout: DefaultSessionTimeout,
eventHandler: defaultEventHandler,
logger: logger,
}
for _, option := range options {
if err := option(&config); err != nil {
return nil, err
}
}
// dialer overrides the default ZooKeeper library Dialer so we can configure
// the connectTimeout. The current library has a hardcoded value of 1 second
// and there are reports of race conditions, due to slow DNS resolvers and
// other network latency issues.
dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
return net.DialTimeout(network, address, config.connectTimeout)
}
conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
if err != nil {
return nil, err
}
if len(config.credentials) > 0 {
err = conn.AddAuth("digest", config.credentials)
if err != nil {
return nil, err
}
}
c := &client{conn, config, true, make(chan struct{})}
// Start listening for incoming Event payloads and callback the set
// eventHandler.
go func() {
for {
select {
case event := <-eventc:
config.eventHandler(event)
case <-c.quit:
return
}
}
}()
return c, nil
}
// CreateParentNodes implements the ZooKeeper Client interface.
func (c *client) CreateParentNodes(path string) error {
if !c.active {
return ErrClientClosed
}
if path[0] != '/' {
return zk.ErrInvalidPath
}
payload := []byte("")
pathString := ""
pathNodes := strings.Split(path, "/")
for i := 1; i < len(pathNodes); i++ {
if i <= len(c.rootNodePayload) {
payload = c.rootNodePayload[i-1]
} else {
payload = []byte("")
}
pathString += "/" + pathNodes[i]
_, err := c.Create(pathString, payload, 0, c.acl)
// not being able to create the node because it exists or not having
// sufficient rights is not an issue. It is ok for the node to already
// exist and/or us to only have read rights
if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
return err
}
}
return nil
}
// GetEntries implements the ZooKeeper Client interface.
func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
// retrieve list of child nodes for given path and add watch to path
znodes, _, eventc, err := c.ChildrenW(path)
if err != nil {
return nil, eventc, err
}
var resp []string
for _, znode := range znodes {
// retrieve payload for child znode and add to response array
if data, _, err := c.Get(path + "/" + znode); err == nil {
resp = append(resp, string(data))
}
}
return resp, eventc, nil
}
// Register implements the ZooKeeper Client interface.
func (c *client) Register(s *Service) error {
if s.Path[len(s.Path)-1] != '/' {
s.Path += "/"
}
path := s.Path + s.Name
if err := c.CreateParentNodes(path); err != nil {
return err
}
if path[len(path)-1] != '/' {
path += "/"
}
node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
if err != nil {
return err
}
s.node = node
return nil
}
// Deregister implements the ZooKeeper Client interface.
func (c *client) Deregister(s *Service) error {
if s.node == "" {
return ErrNotRegistered
}
path := s.Path + s.Name
found, stat, err := c.Exists(path)
if err != nil {
return err
}
if !found {
return ErrNodeNotFound
}
if err := c.Delete(path, stat.Version); err != nil {
return err
}
return nil
}
// Stop implements the ZooKeeper Client interface.
func (c *client) Stop() {
c.active = false
close(c.quit)
c.Close()
}
|