1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
|
package integration
import (
"os"
"os/exec"
"strconv"
"testing"
)
// Integration test skip test if WITH_INTEGRATION environment variable was not
// set to true.
func IntegrationTest(t *testing.T) {
if !hasDocker() {
t.Skip("Integration test. docker and/or docker-compose tools not available")
}
if ok, _ := strconv.ParseBool(os.Getenv("WITH_INTEGRATION")); !ok {
t.Skip("Integration test. Set WITH_INTEGRATION=true to run.")
}
}
func hasDocker() bool {
if err := exec.Command("docker", "--version").Run(); err != nil {
return false
}
if err := exec.Command("docker-compose", "--version").Run(); err != nil {
return false
}
return true
}
func TestKafkaCluster(t *testing.T) {
IntegrationTest(t)
const clusterSize = 4
cluster := NewKafkaCluster("kafka-docker/", clusterSize)
if err := cluster.Start(); err != nil {
t.Fatalf("cannot start kafka cluster: %s", err)
}
addrs, err := cluster.KafkaAddrs()
if err != nil {
t.Fatalf("cannot get kafka cluster addresses: %s", err)
}
if len(addrs) != clusterSize {
t.Fatalf("expected %d addresses, got %d (%v)", clusterSize, len(addrs), addrs)
}
if err := cluster.Stop(); err != nil {
t.Fatalf("cannot stop kafka cluster: %s", err)
}
}
func TestContainerRestart(t *testing.T) {
IntegrationTest(t)
cluster := NewKafkaCluster("kafka-docker/", 4)
if err := cluster.Start(); err != nil {
t.Fatalf("cannot start kafka cluster: %s", err)
}
containers, err := cluster.Containers()
if err != nil {
t.Fatalf("cannot get containers info: %s", err)
}
// 4 kafka + zookeeper
if len(containers) != 5 {
t.Fatalf("expected 5 containers, got %d", len(containers))
}
// first stop all zookeeper containers
for _, container := range containers {
if container.RunningKafka() {
continue
}
if err := container.Stop(); err != nil {
t.Fatalf("cannot stop %q container: %s", container.ID, err)
}
}
// then stop all kafka containers
for _, container := range containers {
if !container.RunningKafka() {
continue
}
if err := container.Stop(); err != nil {
t.Fatalf("cannot stop %q container: %s", container.ID, err)
}
}
// first start all zookeeper containers
for _, container := range containers[1:] {
if container.RunningKafka() {
continue
}
if err := container.Start(); err != nil {
t.Fatalf("cannot start %q container: %s", container.ID, err)
}
}
// then start all kafka containers
for _, container := range containers[1:] {
if !container.RunningKafka() {
continue
}
if err := container.Start(); err != nil {
t.Fatalf("cannot start %q container: %s", container.ID, err)
}
}
if err := cluster.Stop(); err != nil {
t.Fatalf("cannot stop kafka cluster: %s", err)
}
}
|