File: cluster_test.go

package info (click to toggle)
golang-github-optiopay-kafka 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, forky, sid, trixie
  • size: 676 kB
  • sloc: sh: 163; makefile: 2
file content (113 lines) | stat: -rw-r--r-- 2,724 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package integration

import (
	"os"
	"os/exec"
	"strconv"
	"testing"
)

// Integration test skip test if WITH_INTEGRATION environment variable was not
// set to true.
func IntegrationTest(t *testing.T) {
	if !hasDocker() {
		t.Skip("Integration test. docker and/or docker-compose tools not available")
	}

	if ok, _ := strconv.ParseBool(os.Getenv("WITH_INTEGRATION")); !ok {
		t.Skip("Integration test. Set WITH_INTEGRATION=true to run.")
	}
}

func hasDocker() bool {
	if err := exec.Command("docker", "--version").Run(); err != nil {
		return false
	}
	if err := exec.Command("docker-compose", "--version").Run(); err != nil {
		return false
	}
	return true
}

func TestKafkaCluster(t *testing.T) {
	IntegrationTest(t)

	const clusterSize = 4

	cluster := NewKafkaCluster("kafka-docker/", clusterSize)
	if err := cluster.Start(); err != nil {
		t.Fatalf("cannot start kafka cluster: %s", err)
	}

	addrs, err := cluster.KafkaAddrs()
	if err != nil {
		t.Fatalf("cannot get kafka cluster addresses: %s", err)
	}
	if len(addrs) != clusterSize {
		t.Fatalf("expected %d addresses, got %d (%v)", clusterSize, len(addrs), addrs)
	}

	if err := cluster.Stop(); err != nil {
		t.Fatalf("cannot stop kafka cluster: %s", err)
	}
}

func TestContainerRestart(t *testing.T) {
	IntegrationTest(t)

	cluster := NewKafkaCluster("kafka-docker/", 4)
	if err := cluster.Start(); err != nil {
		t.Fatalf("cannot start kafka cluster: %s", err)
	}

	containers, err := cluster.Containers()
	if err != nil {
		t.Fatalf("cannot get containers info: %s", err)
	}
	// 4 kafka + zookeeper
	if len(containers) != 5 {
		t.Fatalf("expected 5 containers, got %d", len(containers))
	}

	// first stop all zookeeper containers
	for _, container := range containers {
		if container.RunningKafka() {
			continue
		}
		if err := container.Stop(); err != nil {
			t.Fatalf("cannot stop %q container: %s", container.ID, err)
		}
	}
	// then stop all kafka containers
	for _, container := range containers {
		if !container.RunningKafka() {
			continue
		}
		if err := container.Stop(); err != nil {
			t.Fatalf("cannot stop %q container: %s", container.ID, err)
		}
	}

	// first start all zookeeper containers
	for _, container := range containers[1:] {
		if container.RunningKafka() {
			continue
		}
		if err := container.Start(); err != nil {
			t.Fatalf("cannot start %q container: %s", container.ID, err)
		}
	}
	// then start all kafka containers
	for _, container := range containers[1:] {
		if !container.RunningKafka() {
			continue
		}
		if err := container.Start(); err != nil {
			t.Fatalf("cannot start %q container: %s", container.ID, err)
		}
	}

	if err := cluster.Stop(); err != nil {
		t.Fatalf("cannot stop kafka cluster: %s", err)
	}
}