File: cluster_integration_test.go

package info (click to toggle)
golang-gopkg-rethinkdb-rethinkdb-go.v6 6.2.1-5
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,736 kB
  • sloc: python: 1,382; makefile: 16; sh: 9
file content (126 lines) | stat: -rw-r--r-- 2,774 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// +build cluster
// +build integration

package tests

import (
	"time"

	test "gopkg.in/check.v1"
	r "gopkg.in/rethinkdb/rethinkdb-go.v6"
	"strconv"
	"strings"
)

func (s *RethinkSuite) TestClusterDetectNewNode(c *test.C) {
	h1, p1 := splitAddress(url)
	h2, p2 := splitAddress(url2)
	hosts := []r.Host{r.NewHost(h1, p1), r.NewHost(h2, p2)}

	cluster, err := r.NewCluster(hosts, &r.ConnectOpts{
		Addresses:           []string{url, url2},
		DiscoverHosts:       true,
		NodeRefreshInterval: time.Second,
	})
	c.Assert(err, test.IsNil)

	t := time.NewTimer(time.Second * 30)
	for {
		select {
		// Fail if deadline has passed
		case <-t.C:
			c.Fatal("No node was added to the cluster")
		default:
			// Pass if another node was added
			if len(cluster.GetNodes()) >= 3 {
				return
			}
		}
	}
}

func (s *RethinkSuite) TestClusterRecoverAfterNoNodes(c *test.C) {
	h1, p1 := splitAddress(url)
	h2, p2 := splitAddress(url2)
	hosts := []r.Host{r.NewHost(h1, p1), r.NewHost(h2, p2)}

	cluster, err := r.NewCluster(hosts, &r.ConnectOpts{
		Addresses:           []string{url, url2},
		DiscoverHosts:       true,
		NodeRefreshInterval: time.Second,
	})
	c.Assert(err, test.IsNil)

	t := time.NewTimer(time.Second * 30)
	hasHadZeroNodes := false
	for {
		select {
		// Fail if deadline has passed
		case <-t.C:
			c.Fatal("No node was added to the cluster")
		default:
			// Check if there are no nodes
			if len(cluster.GetNodes()) == 0 {
				hasHadZeroNodes = true
			}

			// Pass if another node was added
			if len(cluster.GetNodes()) >= 1 && hasHadZeroNodes {
				return
			}
		}
	}
}

func (s *RethinkSuite) TestClusterNodeHealth(c *test.C) {
	session, err := r.Connect(r.ConnectOpts{
		Addresses:           []string{url1, url2, url3},
		DiscoverHosts:       true,
		NodeRefreshInterval: time.Second,
		InitialCap:          50,
		MaxOpen:             200,
	})
	c.Assert(err, test.IsNil)

	attempts := 0
	failed := 0
	seconds := 0

	t := time.NewTimer(time.Second * 30)
	tick := time.NewTicker(time.Second)
	for {
		select {
		// Fail if deadline has passed
		case <-tick.C:
			seconds++
			c.Logf("%ds elapsed", seconds)
		case <-t.C:
			// Execute queries for 10s and check that at most 5% of the queries fail
			c.Logf("%d of the %d(%d%%) queries failed", failed, attempts, (failed / attempts))
			c.Assert(failed <= 100, test.Equals, true)
			return
		default:
			attempts++
			if err := r.Expr(1).Exec(session); err != nil {
				c.Logf("Query failed, %s", err)
				failed++
			}
		}
	}
}

func splitAddress(address string) (hostname string, port int) {
	hostname = "localhost"
	port = 28015

	addrParts := strings.Split(address, ":")

	if len(addrParts) >= 1 {
		hostname = addrParts[0]
	}
	if len(addrParts) >= 2 {
		port, _ = strconv.Atoi(addrParts[1])
	}

	return
}