File: broadcast_test.go

package info (click to toggle)
golang-github-docker-go-events 0.0~git20170721.0.9461782-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 144 kB
  • sloc: makefile: 2
file content (97 lines) | stat: -rw-r--r-- 2,159 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package events

import (
	"sync"
	"testing"
)

func TestBroadcaster(t *testing.T) {
	const nEvents = 1000
	var sinks []Sink
	b := NewBroadcaster()
	for i := 0; i < 10; i++ {
		sinks = append(sinks, newTestSink(t, nEvents))
		b.Add(sinks[i])
		b.Add(sinks[i]) // noop
	}

	var wg sync.WaitGroup
	for i := 1; i <= nEvents; i++ {
		wg.Add(1)
		go func(event Event) {
			if err := b.Write(event); err != nil {
				t.Fatalf("error writing event %v: %v", event, err)
			}
			wg.Done()
		}("event")
	}

	wg.Wait() // Wait until writes complete

	for i := range sinks {
		b.Remove(sinks[i])
	}

	// sending one more should trigger test failure if they weren't removed.
	if err := b.Write("onemore"); err != nil {
		t.Fatalf("unexpected error sending one more: %v", err)
	}

	// add them back to test closing.
	for i := range sinks {
		b.Add(sinks[i])
	}

	checkClose(t, b)

	// Iterate through the sinks and check that they all have the expected length.
	for _, sink := range sinks {
		ts := sink.(*testSink)
		ts.mu.Lock()
		defer ts.mu.Unlock()

		if len(ts.events) != nEvents {
			t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
		}

		if !ts.closed {
			t.Fatalf("sink should have been closed")
		}
	}
}

func BenchmarkBroadcast10(b *testing.B) {
	benchmarkBroadcast(b, 10)
}

func BenchmarkBroadcast100(b *testing.B) {
	benchmarkBroadcast(b, 100)
}

func BenchmarkBroadcast1000(b *testing.B) {
	benchmarkBroadcast(b, 1000)
}

func BenchmarkBroadcast10000(b *testing.B) {
	benchmarkBroadcast(b, 10000)
}

func benchmarkBroadcast(b *testing.B, nsinks int) {
	// counter := metrics.NewCounter()
	// metrics.DefaultRegistry.Register(fmt.Sprintf("nsinks: %v", nsinks), counter)
	// go metrics.Log(metrics.DefaultRegistry, 500*time.Millisecond, log.New(os.Stderr, "metrics: ", log.LstdFlags))

	b.StopTimer()
	var sinks []Sink
	for i := 0; i < nsinks; i++ {
		// counter.Inc(1)
		sinks = append(sinks, newTestSink(b, b.N))
		// sinks = append(sinks, NewQueue(&testSink{t: b, expected: b.N}))
	}
	b.StartTimer()

	// meter := metered{}
	// NewQueue(meter.Egress(dst))

	benchmarkSink(b, NewBroadcaster(sinks...))
}