File: etcd.go

package info (click to toggle)
golang-github-xordataexchange-crypt 0.0.2%2Bgit20150523.17.749e360-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 152 kB
  • ctags: 93
  • sloc: makefile: 2
file content (84 lines) | stat: -rw-r--r-- 2,011 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package etcd

import (
	"errors"
	"time"

	"github.com/xordataexchange/crypt/backend"

	goetcd "github.com/coreos/go-etcd/etcd"
)

type Client struct {
	client    *goetcd.Client
	waitIndex uint64
}

func New(machines []string) (*Client, error) {
	return &Client{goetcd.NewClient(machines), 0}, nil
}

func (c *Client) Get(key string) ([]byte, error) {
	resp, err := c.client.Get(key, false, false)
	if err != nil {
		return nil, err
	}
	return []byte(resp.Node.Value), nil
}

func addKVPairs(node *goetcd.Node, list backend.KVPairs) backend.KVPairs {
	if node.Dir {
		for _, n := range node.Nodes {
			list = addKVPairs(n, list)
		}
		return list
	}
	return append(list, &backend.KVPair{Key: node.Key, Value: []byte(node.Value)})
}

func (c *Client) List(key string) (backend.KVPairs, error) {
	resp, err := c.client.Get(key, false, true)
	if err != nil {
		return nil, err
	}
	if !resp.Node.Dir {
		return nil, errors.New("key is not a directory")
	}
	list := addKVPairs(resp.Node, nil)
	return list, nil
}

func (c *Client) Set(key string, value []byte) error {
	_, err := c.client.Set(key, string(value), 0)
	return err
}

func (c *Client) Watch(key string, stop chan bool) <-chan *backend.Response {
	respChan := make(chan *backend.Response, 0)
	go func() {
		for {
			var resp *goetcd.Response
			var err error
			// if c.waitIndex == 0 {
			// 	resp, err = c.client.Get(key, false, false)
			// 	if err != nil {
			// 		respChan <- &backend.Response{nil, err}
			// 		time.Sleep(time.Second * 5)
			// 		continue
			// 	}
			// 	c.waitIndex = resp.EtcdIndex
			// 	respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
			// }
			// resp, err = c.client.Watch(key, c.waitIndex+1, false, nil, stop)
			resp, err = c.client.Watch(key, 0, false, nil, stop)
			if err != nil {
				respChan <- &backend.Response{nil, err}
				time.Sleep(time.Second * 5)
				continue
			}
			c.waitIndex = resp.Node.ModifiedIndex
			respChan <- &backend.Response{[]byte(resp.Node.Value), nil}
		}
	}()
	return respChan
}