File: unified_runner_events_helper_test.go

package info (click to toggle)
golang-mongodb-mongo-driver 1.8.1%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 18,500 kB
  • sloc: perl: 533; ansic: 491; python: 432; makefile: 187; sh: 72
file content (156 lines) | stat: -rw-r--r-- 5,078 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (C) MongoDB, Inc. 2017-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package integration

import (
	"context"
	"sync"
	"time"

	"go.mongodb.org/mongo-driver/event"
	"go.mongodb.org/mongo-driver/internal/testutil/assert"
	"go.mongodb.org/mongo-driver/mongo/address"
	"go.mongodb.org/mongo-driver/mongo/description"
	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
	"go.mongodb.org/mongo-driver/mongo/readpref"
	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
)

// Helper functions for the operations in the unified spec test runner that require assertions about SDAM and connection
// pool events.

var (
	poolEventTypesMap = map[string]string{
		"PoolClearedEvent": event.PoolCleared,
	}
	defaultCallbackTimeout = 10 * time.Second
)

// unifiedRunnerEventMonitor monitors connection pool-related events.
type unifiedRunnerEventMonitor struct {
	poolEventCount               map[string]int
	poolEventCountLock           sync.Mutex
	sdamMonitor                  *event.ServerMonitor
	serverMarkedUnknownCount     int
	serverMarkedUnknownCountLock sync.Mutex
}

func newUnifiedRunnerEventMonitor() *unifiedRunnerEventMonitor {
	urem := unifiedRunnerEventMonitor{
		poolEventCount: make(map[string]int),
	}
	urem.sdamMonitor = &event.ServerMonitor{
		ServerDescriptionChanged: (func(e *event.ServerDescriptionChangedEvent) {
			urem.serverMarkedUnknownCountLock.Lock()
			defer urem.serverMarkedUnknownCountLock.Unlock()

			// Spec tests only ever handle ServerMarkedUnknown ServerDescriptionChangedEvents
			// for the time being.
			if e.NewDescription.Kind == description.Unknown {
				urem.serverMarkedUnknownCount++
			}
		}),
	}
	return &urem
}

// handlePoolEvent can be used as the event handler for a connection pool monitor.
func (u *unifiedRunnerEventMonitor) handlePoolEvent(evt *event.PoolEvent) {
	u.poolEventCountLock.Lock()
	defer u.poolEventCountLock.Unlock()

	u.poolEventCount[evt.Type]++
}

// getPoolEventCount returns the number of pool events of the given type, or 0 if no events were recorded.
func (u *unifiedRunnerEventMonitor) getPoolEventCount(eventType string) int {
	u.poolEventCountLock.Lock()
	defer u.poolEventCountLock.Unlock()

	mappedType := poolEventTypesMap[eventType]
	return u.poolEventCount[mappedType]
}

// getServerMarkedUnknownEvent returns the number of ServerMarkedUnknownEvents, or 0 if none were recorded.
func (u *unifiedRunnerEventMonitor) getServerMarkedUnknownCount() int {
	u.serverMarkedUnknownCountLock.Lock()
	defer u.serverMarkedUnknownCountLock.Unlock()

	return u.serverMarkedUnknownCount
}

func waitForEvent(mt *mtest.T, test *testCase, op *operation) {
	eventType := op.Arguments.Lookup("event").StringValue()
	expectedCount := int(op.Arguments.Lookup("count").Int32())

	callback := func() {
		for {
			var count int
			// Spec tests only ever wait for ServerMarkedUnknown SDAM events for the time being.
			if eventType == "ServerMarkedUnknownEvent" {
				count = test.monitor.getServerMarkedUnknownCount()
			} else {
				count = test.monitor.getPoolEventCount(eventType)
			}

			if count >= expectedCount {
				return
			}
			time.Sleep(100 * time.Millisecond)
		}
	}

	assert.Soon(mt, callback, defaultCallbackTimeout)
}

func assertEventCount(mt *mtest.T, testCase *testCase, op *operation) {
	eventType := op.Arguments.Lookup("event").StringValue()
	expectedCount := int(op.Arguments.Lookup("count").Int32())

	var gotCount int
	// Spec tests only ever assert ServerMarkedUnknown SDAM events for the time being.
	if eventType == "ServerMarkedUnknownEvent" {
		gotCount = testCase.monitor.getServerMarkedUnknownCount()
	} else {
		gotCount = testCase.monitor.getPoolEventCount(eventType)
	}
	assert.Equal(mt, expectedCount, gotCount, "expected count %d for event %s, got %d", expectedCount, eventType,
		gotCount)
}

func recordPrimary(mt *mtest.T, testCase *testCase) {
	testCase.recordedPrimary = getPrimaryAddress(mt, testCase.testTopology, true)
}

func waitForPrimaryChange(mt *mtest.T, testCase *testCase, op *operation) {
	callback := func() {
		for {
			if getPrimaryAddress(mt, testCase.testTopology, false) != testCase.recordedPrimary {
				return
			}
		}
	}

	timeout := convertValueToMilliseconds(mt, op.Arguments.Lookup("timeoutMS"))
	assert.Soon(mt, callback, timeout)
}

// getPrimaryAddress returns the address of the current primary. If failFast is true, the server selection fast path
// is used and the function will fail if the fast path doesn't return a server.
func getPrimaryAddress(mt *mtest.T, topo *topology.Topology, failFast bool) address.Address {
	mt.Helper()

	ctx, cancel := context.WithCancel(mtest.Background)
	defer cancel()
	if failFast {
		cancel()
	}

	primary, err := topo.SelectServer(ctx, description.ReadPrefSelector(readpref.Primary()))
	assert.Nil(mt, err, "SelectServer error: %v", err)
	return primary.(*topology.SelectedServer).Description().Addr
}